voice/google-gemini-live-api/architecture-llm.md
This document provides a comprehensive architecture reference for LLMs working with @mastra/voice-google-gemini-live.
Purpose: Wrapper for Google's Gemini Live API providing real-time multimodal voice interactions
Base Class: Extends MastraVoice from @mastra/core/voice
Key Capabilities: Bidirectional audio streaming, tool calling, VAD, session management, multi-auth support
class GeminiLiveVoice extends MastraVoice<
GeminiLiveVoiceConfig, // Configuration
GeminiLiveVoiceOptions, // Runtime options
GeminiLiveVoiceOptions, // Additional options
ToolsInput, // Tools type
GeminiLiveEventMap // Event map
>
src/
├── index.ts # Main GeminiLiveVoice class
├── types.ts # Type definitions
├── utils/errors.ts # Error handling
└── managers/
├── AudioStreamManager.ts # Audio processing
├── AuthManager.ts # Authentication
├── ConnectionManager.ts # WebSocket management
├── ContextManager.ts # Conversation context
├── EventManager.ts # Event system
└── SessionManager.ts # Session lifecycle
Responsibility: Authentication for both Gemini API and Vertex AI
Authentication Methods:
x-goog-api-keyToken Caching: 50-minute cache for OAuth tokens (60-minute expiry)
Key Methods:
initialize(): Set up auth clientgetAccessToken(): Get cached or fresh OAuth tokenclearCache(): Clear token cacheResponsibility: WebSocket connection lifecycle
States: disconnected → connecting → connected → disconnected
Key Methods:
setWebSocket(ws): Store WebSocket instancewaitForOpen(): Promise-based connection wait (30s timeout)send(data): Validated send operationisConnected(): Check WebSocket.OPEN stateResponsibility: Audio format conversion and stream management
Audio Config:
Limits:
Key Methods:
processAudioChunk(chunk): Convert Buffer → Int16Array → Base64createAudioMessage(data, type): Wrap audio in API formatcreateSpeakerStream(id): Create PassThrough stream for responsecleanupSpeakerStreams(): Remove all active streamsResponsibility: Conversation history with optional compression
Context Entry:
{ role: 'user' | 'assistant', content: string, timestamp: number }
Limits:
Key Methods:
addEntry(role, content): Add with auto-truncationgetContextHistory(): Get full historycompressContext(): Smart compressionclearContext(): Clear all historyResponsibility: Type-safe event system
Generic Implementation:
class EventManager<TEvents extends EventMap> {
emit<K extends keyof TEvents>(event: K, data: TEvents[K]): boolean;
on<E extends keyof TEvents>(event: E, callback: (data: TEvents[E]) => void): void;
once<E extends keyof TEvents>(event: E, callback: (data: TEvents[E]) => void): void;
}
Features: Type safety, listener tracking, memory leak prevention, debug logging
connect() calledAuthManager.initialize() - get credentialsConnectionManager.waitForOpen() - wait for OPEN statesetupComplete messageconnected, emit session eventclient_content messagesession.update for runtime optionsserverContentrealtime_input messageserverContent messagemodelTurn.parts[]speaker event (stream) and speaking event (chunk)turnComplete: cleanup streamsIMPORTANT: Gemini sends tool calls in TWO formats:
Format 1 (Legacy - still supported):
{ "toolCall": { "name": "...", "args": {...}, "id": "..." } }
Format 2 (Actual Gemini format):
{
"serverContent": {
"modelTurn": {
"parts": [
{ "functionCall": { "name": "...", "args": {...} } }
]
}
}
}
Processing:
handleServerContent checks each part for functionCalltoolCall formathandleToolCall()this.toolstool.execute(args, { requestContext })tool_result back to serverinterface GeminiLiveEventMap {
speaker: NodeJS.ReadableStream; // Concatenated audio per response
speaking: {
// Individual chunks
audio?: string; // Base64
audioData?: Int16Array; // Raw PCM
sampleRate?: number; // 24000
};
writing: {
text: string;
role: 'assistant' | 'user';
};
session: {
state: 'connecting' | 'connected' | 'disconnected' | 'disconnecting' | 'updated';
config?: Record<string, unknown>;
};
toolCall: {
name: string;
args: Record<string, any>;
id: string;
};
error: {
message: string;
code?: string;
details?: unknown;
};
usage: { inputTokens; outputTokens; totalTokens; modality };
turnComplete: { timestamp: number };
vad: { type: 'start' | 'end'; timestamp };
interrupt: { type: 'user' | 'model'; timestamp };
}
// Internal
this.emit('speaking', data);
// External
voice.on('speaking', data => {
/* handle */
});
Format 1 - Mastra Tool (via addTools):
const tool = {
id: 'getWeather',
description: 'Get weather',
inputSchema: z.object({ location: z.string() }),
execute: async (args, context) => {
return result;
},
};
voice.addTools({ getWeather: tool });
Format 2 - Gemini Tool Config (via constructor):
const tools: GeminiToolConfig[] = [
{
name: 'getWeather',
description: 'Get weather',
parameters: {
type: 'object',
properties: { location: { type: 'string' } },
required: ['location'],
},
},
];
const voice = new GeminiLiveVoice({ tools });
// In handleToolCall
const tool = this.tools[toolName];
const result = await tool.execute(toolArgs, { requestContext: this.requestContext });
// Send result back
this.sendEvent('tool_result', {
tool_result: { tool_call_id: toolId, result },
});
Zod schemas automatically converted to JSON Schema:
ZodString → { type: 'string' }ZodNumber → { type: 'number' }ZodBoolean → { type: 'boolean' }ZodObject → { type: 'object', properties: {...}, required: [...] }ZodEnum → { type: 'string', enum: [...] }enum GeminiLiveErrorCode {
CONNECTION_FAILED,
CONNECTION_NOT_ESTABLISHED,
WEBSOCKET_ERROR,
AUTHENTICATION_FAILED,
API_KEY_MISSING,
PROJECT_ID_MISSING,
AUDIO_PROCESSING_ERROR,
AUDIO_STREAM_ERROR,
SPEAKER_STREAM_ERROR,
INVALID_AUDIO_FORMAT,
TOOL_EXECUTION_ERROR,
TOOL_NOT_FOUND,
SESSION_CONFIG_UPDATE_FAILED,
SESSION_RESUMPTION_FAILED,
NOT_CONNECTED,
INVALID_STATE,
STREAM_LIMIT_EXCEEDED,
TRANSCRIPTION_TIMEOUT,
TRANSCRIPTION_FAILED,
UNKNOWN_ERROR,
}
createAndEmitError(code, message, details?) {
const error = new GeminiLiveError(code, message, details);
this.log(`Error [${code}]: ${message}`, details);
this.emit('error', error.toEventData());
return error;
}
sessionId: Unique identifier (UUID)sessionHandle: For resumptionsessionStartTime: Start timestampisResuming: Flag for resumptionsessionDurationTimeout: Duration monitorinterface GeminiSessionConfig {
enableResumption?: boolean; // Save handle for reconnection
maxDuration?: string; // '24h', '2h', '30m'
contextCompression?: boolean; // Auto-compress context
vad?: {
enabled?: boolean;
sensitivity?: number; // 0-1
silenceDurationMs?: number;
};
interrupts?: {
enabled?: boolean;
allowUserInterruption?: boolean;
};
}
sessionHandle on disconnect (if enableResumption)resumeSession(handle, context?)isResuming = trueconnect(), send session_resume instead of setupEach concern delegated to specialized manager. Benefits: separation of concerns, testability, maintainability.
Loose coupling via typed events. Benefits: flexibility, extensibility, clear data flow.
AudioStreamManager receives sender callback. Benefits: testability, loose coupling.
PassThrough streams with metadata, limits, and auto-cleanup. Benefits: prevents memory leaks, handles errors.
Consistent validation methods throw typed errors. Benefits: fail fast, clear messages.
Support multiple config formats internally normalized. Benefits: backward compatibility, flexibility.
All listeners removed on disconnect() via eventManager.cleanup().
OAuth tokens cached for 50 minutes to avoid expensive requests.
{
"setup": {
"model": "models/gemini-2.0-flash-exp",
"systemInstruction": { "parts": [{ "text": "..." }] },
"generationConfig": { "responseModalities": ["AUDIO", "TEXT"], "speechConfig": {...} },
"tools": [{ "functionDeclarations": [{...}] }]
}
}
{
"realtime_input": {
"media_chunks": [{ "mime_type": "audio/pcm", "data": "base64..." }]
}
}
{
"client_content": {
"turns": [{ "role": "user", "parts": [{ "text": "..." }] }],
"turnComplete": true
}
}
{
"serverContent": {
"modelTurn": {
"parts": [
{ "text": "..." },
{ "inlineData": { "mimeType": "audio/pcm", "data": "base64..." } },
{ "functionCall": { "name": "getWeather", "args": { "location": "Tokyo" } } }
]
},
"turnComplete": true
}
}
{
"tool_result": {
"tool_call_id": "call_123",
"result": { "temperature": 72 }
}
}
handleGeminiMessage(data) {
if (data.setup || data.setupComplete) handleSetupComplete(data);
else if (data.serverContent) handleServerContent(data.serverContent);
else if (data.toolCall) handleToolCall(data);
else if (data.usageMetadata) handleUsageUpdate(data);
else if (data.error) handleError(data.error);
}
handleServerContent(data) {
for (const part of data.modelTurn.parts) {
if (part.text) emit('writing', { text, role: 'assistant' });
if (part.functionCall) {
// Convert to toolCall format and handle
handleToolCall({ toolCall: { name, args, id } });
}
if (part.inlineData?.mimeType?.includes('audio')) {
// Process audio, emit 'speaking' and 'speaker' events
}
}
if (data.turnComplete) {
cleanupSpeakerStreams();
emit('turnComplete', { timestamp });
}
}
// Input: Buffer → Int16Array → Base64 → WebSocket
processAudioChunk(chunk: Buffer): string {
const int16Array = new Int16Array(chunk.buffer, chunk.byteOffset, chunk.byteLength / 2);
return int16ArrayToBase64(int16Array);
}
// Output: Base64 → Int16Array → Buffer → Stream
handleAudioOutput(base64: string) {
const int16Array = base64ToInt16Array(base64);
const buffer = Buffer.from(int16Array.buffer);
speakerStream.write(buffer);
}
types.tshandleServerContent for functionCall detectionthis.toolsexecute is called with args as first parameterGeminiLiveServerMessage typehandleGeminiMessageProblem: Tool calls triggered but args always {}
Root Cause: Only checked for top-level toolCall, but Gemini sends as serverContent.modelTurn.parts[].functionCall
Fix:
functionCall to parts type definitionhandleServerContent:
if (part.functionCall) {
const toolCallData = { toolCall: { name, args, id } };
handleToolCall(toolCallData);
}
Key Lesson: Always check actual API message format, not just documentation.