Loading...
Loading...
This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.
The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via asyncio.Queue objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.
Use this skill when:
Every voice AI engine follows this pipeline:
Audio In → Transcriber → Agent → Synthesizer → Audio Out
(Worker 1) (Worker 2) (Worker 3)
Key Benefits:
Every worker follows this pattern:
class BaseWorker:
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue # asyncio.Queue to consume from
self.output_queue = output_queue # asyncio.Queue to produce to
self.active = False
def start(self):
"""Start the worker's processing loop"""
self.active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
"""Main processing loop - runs forever until terminated"""
while self.active:
item = await self.input_queue.get() # Block until item arrives
await self.process(item) # Process the item
async def process(self, item):
"""Override this - does the actual work"""
raise NotImplementedError
def terminate(self):
"""Stop the worker"""
self.active = False
Purpose: Converts incoming audio chunks to text transcriptions
Interface Requirements:
class BaseTranscriber:
def __init__(self, transcriber_config):
self.input_queue = asyncio.Queue() # Audio chunks (bytes)
self.output_queue = asyncio.Queue() # Transcriptions
self.is_muted = False
def send_audio(self, chunk: bytes):
"""Client calls this to send audio"""
if not self.is_muted:
self.input_queue.put_nowait(chunk)
else:
# Send silence instead (prevents echo during bot speech)
self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
def mute(self):
"""Called when bot starts speaking (prevents echo)"""
self.is_muted = True
def unmute(self):
"""Called when bot stops speaking"""
self.is_muted = False
Output Format:
class Transcription:
message: str # "Hello, how are you?"
confidence: float # 0.95
is_final: bool # True = complete sentence, False = partial
is_interrupt: bool # Set by TranscriptionsWorker
Supported Providers:
Critical Implementation Details:
asyncio.gather()Purpose: Processes user input and generates conversational responses
Interface Requirements:
class BaseAgent:
def __init__(self, agent_config):
self.input_queue = asyncio.Queue() # TranscriptionAgentInput
self.output_queue = asyncio.Queue() # AgentResponse
self.transcript = None # Conversation history
async def generate_response(self, human_input, is_interrupt, conversation_id):
"""Override this - returns AsyncGenerator of responses"""
raise NotImplementedError
Why Streaming Responses?
Supported Providers:
Critical Implementation Details:
Transcript objectAsyncGeneratorPurpose: Converts agent text responses to speech audio
Interface Requirements:
class BaseSynthesizer:
async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
"""
Returns a SynthesisResult containing:
- chunk_generator: AsyncGenerator that yields audio chunks
- get_message_up_to: Function to get partial text (for interrupts)
"""
raise NotImplementedError
SynthesisResult Structure:
class SynthesisResult:
chunk_generator: AsyncGenerator[ChunkResult, None]
get_message_up_to: Callable[[float], str] # seconds → partial text
class ChunkResult:
chunk: bytes # Raw PCM audio
is_last_chunk: bool
Supported Providers:
Critical Implementation Details:
get_message_up_to() for interrupt handlingPurpose: Sends synthesized audio back to the client
CRITICAL: Rate Limiting for Interrupts
async def send_speech_to_output(self, message, synthesis_result,
stop_event, seconds_per_chunk):
chunk_idx = 0
async for chunk_result in synthesis_result.chunk_generator:
# Check for interrupt
if stop_event.is_set():
logger.debug(f"Interrupted after {chunk_idx} chunks")
message_sent = synthesis_result.get_message_up_to(
chunk_idx * seconds_per_chunk
)
return message_sent, True # cut_off = True
start_time = time.time()
# Send chunk to output device
self.output_device.consume_nonblocking(chunk_result.chunk)
# CRITICAL: Wait for chunk to play before sending next one
# This is what makes interrupts work!
speech_length = seconds_per_chunk
processing_time = time.time() - start_time
await asyncio.sleep(max(speech_length - processing_time, 0))
chunk_idx += 1
return message, False # cut_off = False
Why Rate Limiting? Without rate limiting, all audio chunks would be sent immediately, which would:
By sending one chunk every N seconds:
The interrupt system is critical for natural conversations.
Scenario: Bot is saying "I think the weather will be nice today and tomorrow and—" when user interrupts with "Stop".
Step 1: User starts speaking
# TranscriptionsWorker detects new transcription while bot speaking
async def process(self, transcription):
if not self.conversation.is_human_speaking: # Bot was speaking!
# Broadcast interrupt to all in-flight events
interrupted = self.conversation.broadcast_interrupt()
transcription.is_interrupt = interrupted
Step 2: broadcast_interrupt() stops everything
def broadcast_interrupt(self):
num_interrupts = 0
# Interrupt all queued events
while True:
try:
interruptible_event = self.interruptible_events.get_nowait()
if interruptible_event.interrupt(): # Sets interruption_event
num_interrupts += 1
except queue.Empty:
break
# Cancel current tasks
self.agent.cancel_current_task() # Stop generating text
self.agent_responses_worker.cancel_current_task() # Stop synthesizing
return num_interrupts > 0
Step 3: SynthesisResultsWorker detects interrupt
async def send_speech_to_output(self, synthesis_result, stop_event, ...):
async for chunk_result in synthesis_result.chunk_generator:
# Check stop_event (this is the interruption_event)
if stop_event.is_set():
logger.debug("Interrupted! Stopping speech.")
# Calculate what was actually spoken
seconds_spoken = chunk_idx * seconds_per_chunk
partial_message = synthesis_result.get_message_up_to(seconds_spoken)
# e.g., "I think the weather will be nice today"
return partial_message, True # cut_off = True
Step 4: Agent updates history
if cut_off:
# Update conversation history with partial message
self.agent.update_last_bot_message_on_cut_off(message_sent)
# History now shows:
# Bot: "I think the weather will be nice today" (incomplete)
Every event in the pipeline is wrapped in an InterruptibleEvent:
class InterruptibleEvent:
def __init__(self, payload, is_interruptible=True):
self.payload = payload
self.is_interruptible = is_interruptible
self.interruption_event = threading.Event() # Initially not set
self.interrupted = False
def interrupt(self) -> bool:
"""Interrupt this event"""
if not self.is_interruptible:
return False
if not self.interrupted:
self.interruption_event.set() # Signal to stop!
self.interrupted = True
return True
return False
def is_interrupted(self) -> bool:
return self.interruption_event.is_set()
Support multiple providers with a factory pattern:
class VoiceHandler:
"""Multi-provider factory for voice components"""
def create_transcriber(self, agent_config: Dict):
"""Create transcriber based on transcriberProvider"""
provider = agent_config.get("transcriberProvider", "deepgram")
if provider == "deepgram":
return self._create_deepgram_transcriber(agent_config)
elif provider == "assemblyai":
return self._create_assemblyai_transcriber(agent_config)
elif provider == "azure":
return self._create_azure_transcriber(agent_config)
elif provider == "google":
return self._create_google_transcriber(agent_config)
else:
raise ValueError(f"Unknown transcriber provider: {provider}")
def create_agent(self, agent_config: Dict):
"""Create LLM agent based on llmProvider"""
provider = agent_config.get("llmProvider", "openai")
if provider == "openai":
return self._create_openai_agent(agent_config)
elif provider == "gemini":
return self._create_gemini_agent(agent_config)
else:
raise ValueError(f"Unknown LLM provider: {provider}")
def create_synthesizer(self, agent_config: Dict):
"""Create voice synthesizer based on voiceProvider"""
provider = agent_config.get("voiceProvider", "elevenlabs")
if provider == "elevenlabs":
return self._create_elevenlabs_synthesizer(agent_config)
elif provider == "azure":
return self._create_azure_synthesizer(agent_config)
elif provider == "google":
return self._create_google_synthesizer(agent_config)
elif provider == "polly":
return self._create_polly_synthesizer(agent_config)
elif provider == "playht":
return self._create_playht_synthesizer(agent_config)
else:
raise ValueError(f"Unknown voice provider: {provider}")
Voice AI engines typically use WebSocket for bidirectional audio streaming:
@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Create voice components
voice_handler = VoiceHandler()
transcriber = voice_handler.create_transcriber(agent_config)
agent = voice_handler.create_agent(agent_config)
synthesizer = voice_handler.create_synthesizer(agent_config)
# Create output device
output_device = WebsocketOutputDevice(
ws=websocket,
sampling_rate=16000,
audio_encoding=AudioEncoding.LINEAR16
)
# Create conversation orchestrator
conversation = StreamingConversation(
output_device=output_device,
transcriber=transcriber,
agent=agent,
synthesizer=synthesizer
)
# Start all workers
await conversation.start()
try:
# Receive audio from client
async for message in websocket.iter_bytes():
conversation.receive_audio(message)
except WebSocketDisconnect:
logger.info("Client disconnected")
finally:
await conversation.terminate()
Problem: Bot's audio jumps or cuts off mid-response.
Cause: Sending text to synthesizer in small chunks causes multiple TTS calls.
Solution: Buffer the entire LLM response before sending to synthesizer:
# ❌ Bad: Yields sentence-by-sentence
async for sentence in llm_stream:
yield GeneratedResponse(message=BaseMessage(text=sentence))
# ✅ Good: Buffer entire response
full_response = ""
async for chunk in llm_stream:
full_response += chunk
yield GeneratedResponse(message=BaseMessage(text=full_response))
Problem: Bot hears itself speaking and responds to its own audio.
Cause: Transcriber not muted during bot speech.
Solution: Mute transcriber when bot starts speaking:
# Before sending audio to output
self.transcriber.mute()
# After audio playback complete
self.transcriber.unmute()
Problem: User can't interrupt bot mid-sentence.
Cause: All audio chunks sent at once instead of rate-limited.
Solution: Rate-limit audio chunks to match real-time playback:
async for chunk in synthesis_result.chunk_generator:
start_time = time.time()
# Send chunk
output_device.consume_nonblocking(chunk)
# Wait for chunk duration before sending next
processing_time = time.time() - start_time
await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))
Problem: Memory usage grows over time.
Cause: WebSocket connections or API streams not properly closed.
Solution: Always use context managers and cleanup:
try:
async with websockets.connect(url) as ws:
# Use websocket
pass
finally:
# Cleanup
await conversation.terminate()
await transcriber.terminate()
async def _run_loop(self):
while self.active:
try:
item = await self.input_queue.get()
await self.process(item)
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
# Don't crash the worker, continue processing
async def terminate(self):
"""Gracefully shut down all workers"""
self.active = False
# Stop all workers
self.transcriber.terminate()
self.agent.terminate()
self.synthesizer.terminate()
# Wait for queues to drain
await asyncio.sleep(0.5)
# Close connections
if self.websocket:
await self.websocket.close()
# Log key events
logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'")
logger.info(f"🤖 [AGENT] Generating response...")
logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters")
logger.info(f"⚠️ [INTERRUPT] User interrupted bot")
# Track metrics
metrics.increment("transcriptions.count")
metrics.timing("agent.response_time", duration)
metrics.gauge("active_conversations", count)
# Implement rate limiting for API calls
from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second
async def call_api(self, data):
async with rate_limiter:
return await self.client.post(data)
# Producer
async def producer(queue):
while True:
item = await generate_item()
queue.put_nowait(item)
# Consumer
async def consumer(queue):
while True:
item = await queue.get()
await process_item(item)
Instead of returning complete results:
# ❌ Bad: Wait for entire response
async def generate_response(prompt):
response = await openai.complete(prompt) # 5 seconds
return response
# ✅ Good: Stream chunks as they arrive
async def generate_response(prompt):
async for chunk in openai.complete(prompt, stream=True):
yield chunk # Yield after 0.1s, 0.2s, etc.
Maintain conversation history for context:
class Transcript:
event_logs: List[Message] = []
def add_human_message(self, text):
self.event_logs.append(Message(sender=Sender.HUMAN, text=text))
def add_bot_message(self, text):
self.event_logs.append(Message(sender=Sender.BOT, text=text))
def to_openai_messages(self):
return [
{"role": "user" if msg.sender == Sender.HUMAN else "assistant",
"content": msg.text}
for msg in self.event_logs
]
async def test_transcriber():
transcriber = DeepgramTranscriber(config)
# Mock audio input
audio_chunk = b'\x00\x01\x02...'
transcriber.send_audio(audio_chunk)
# Check output
transcription = await transcriber.output_queue.get()
assert transcription.message == "expected text"
async def test_full_pipeline():
# Create all components
conversation = create_test_conversation()
# Send test audio
conversation.receive_audio(test_audio_chunk)
# Wait for response
response = await wait_for_audio_output(timeout=5)
assert response is not None
async def test_interrupt():
conversation = create_test_conversation()
# Start bot speaking
await conversation.agent.generate_response("Tell me a long story")
# Interrupt mid-response
await asyncio.sleep(1) # Let it speak for 1 second
conversation.broadcast_interrupt()
# Verify partial message in transcript
last_message = conversation.transcript.event_logs[-1]
assert last_message.text != full_expected_message
When implementing a voice AI engine:
@websocket-patterns - For WebSocket implementation details@async-python - For asyncio and async patterns@streaming-apis - For streaming API integration@audio-processing - For audio format conversion and processing@systematic-debugging - For debugging complex async pipelinesLibraries:
asyncio - Async programmingwebsockets - WebSocket client/serverFastAPI - WebSocket server frameworkpydub - Audio manipulationnumpy - Audio data processingAPI Providers:
Building a voice AI engine requires:
The key insight: Everything must stream and everything must be interruptible for natural, real-time conversations.
voice-ai-engine-development is an expert AI persona designed to improve your coding workflow. Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support It provides senior-level context directly within your IDE.
To install the voice-ai-engine-development skill, download the package, extract the files to your project's .cursor/skills directory, and type @voice-ai-engine-development in your editor chat to activate the expert instructions.
Yes, the voice-ai-engine-development AI persona is completely free to download and integrate into compatible Agentic IDEs like Cursor, Windsurf, Github Copilot, and Anthropic MCP servers.
Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support
Download Skill Package.cursor/skills@voice-ai-engine-development in editor chat.Copy the instructions from the panel on the left and paste them into your custom instructions setting.
"Adding this voice-ai-engine-development persona to my Cursor workspace completely changed the quality of code my AI generates. Saves me hours every week."
Developers who downloaded voice-ai-engine-development also use these elite AI personas.
Expert in building 3D experiences for the web - Three.js, React Three Fiber, Spline, WebGL, and interactive 3D scenes. Covers product configurators, 3D portfolios, immersive websites, and bringing depth to web experiences. Use when: 3D website, three.js, WebGL, react three fiber, 3D experience.
Structured guide for setting up A/B tests with mandatory gates for hypothesis, metrics, and execution readiness.
You are an accessibility expert specializing in WCAG compliance, inclusive design, and assistive technology compatibility. Conduct audits, identify barriers, and provide remediation guidance.
Explore our most popular utilities designed for the modern Indian creator.