Voice AI Setup Guide

The complete step-by-step guide to building AI voice assistants. Covers OpenAI Whisper, Google Cloud Speech, AWS Transcribe and Azure Speech — with full Python and JavaScript code, real-time streaming, production architecture and troubleshooting.

OpenAI Whisper Google Cloud Speech AWS Transcribe Azure Speech Python & JavaScript

Beginner's Voice AI Guide

What is Voice AI?

Voice AI combines several technologies to create intelligent voice assistants that understand, process and respond to human speech. The core components are:

  • Speech Recognition (STT) — converts voice to text
  • Natural Language Processing (NLP) — understands meaning and intent
  • AI Logic Processing — makes decisions and formulates responses
  • Text-to-Speech (TTS) — converts text back to voice
  • Voice Activity Detection (VAD) — detects when someone is speaking

Your First Voice AI Assistant (5 Minutes)

Build a simple voice assistant using the Web Speech API — no servers or API keys required:

Simple Voice Assistant (HTML + JavaScript)
<!DOCTYPE html> <html> <head> <title>My First Voice Assistant</title> </head> <body> <h1>🤖 Voice Assistant</h1> <button id="startBtn">Start Listening</button> <button id="stopBtn" disabled>Stop</button> <div id="output">
Try it: Copy the code into an HTML file and open in Chrome. Click "Start Listening" and say "Hello", "What time is it?", "Tell me a joke" or "Goodbye".

Understanding the Code

1

Speech Recognition

The browser's Speech Recognition API converts voice to text. continuous: true keeps listening until stopped.

2

Intent Processing

processCommand() analyses text for keywords — basic NLP to understand what the user wants.

3

Response Generation

Based on detected keywords, appropriate responses are generated. Production systems use ML models for this.

4

Text-to-Speech Output

speak() uses the Speech Synthesis API to convert text back to voice.

Browser limitations: Browser-based speech recognition requires internet, supports simple commands only and doesn't work in all browsers. For production, use dedicated AI services below.

Voice AI Platform Comparison

PlatformBest ForPricingLanguagesAccuracy
OpenAI WhisperHigh accuracy, multilingual$0.006/min99+Excellent
Google Cloud SpeechReal-time streaming$0.024/min125+Excellent
AWS TranscribeEnterprise / AWS integration$0.024/min31+Very Good
Azure SpeechMicrosoft ecosystem$1.00/hr85+Very Good
AssemblyAIDeveloper experience$0.00037/sec22+Very Good
Web Speech APIQuick prototypesFree50+Good
OpenAI Whisper
State-of-the-art speech recognition with exceptional multilingual support
  • Highest accuracy available
  • 99+ languages
  • Works with noisy audio
  • Real-time + batch
Setup Guide →
Google Cloud Speech
Robust streaming recognition with advanced features
  • Excellent streaming
  • Speaker diarization
  • Word-level timestamps
  • Custom vocabulary
Setup Guide →
AWS Transcribe
Enterprise-grade with deep AWS integration
  • Medical & legal variants
  • PII redaction built-in
  • Custom language models
  • Batch + streaming
Setup Guide →
Azure Speech
Microsoft speech services with Windows integration
  • Custom neural voices
  • Speaker recognition
  • Intent recognition
  • Office 365 integration
Setup Guide →
Platform selection: Choose OpenAI Whisper for highest accuracy. Google Cloud for real-time streaming. AWS Transcribe if you're already on AWS. Azure if you're in the Microsoft ecosystem. Web Speech API for quick prototypes.

OpenAI Whisper Setup

Setting Up OpenAI Whisper API

1

Get API Key

Sign up at platform.openai.com and create an API key.

2

Install Dependencies

Python: pip install openai requests python-dotenv / Node.js: npm install openai form-data dotenv

Python Implementation

OpenAI Whisper Python Client
import os import openai from dotenv import load_dotenv import requests import json load_dotenv() class WhisperVoiceAI: def __init__(self): self.client = openai.OpenAI(api_key=os.getenv('OPENAI_API_KEY')) def transcribe_audio(self, audio_file_path, language=None): """ Transcribe audio file using Whisper API """ try: with open(audio_file_path, 'rb') as audio_file: transcript = self.client.audio.transcriptions.create( model="whisper-1", file=audio_file, language=language, # Optional: 'en', 'es', 'fr', etc. response_format="verbose_json", # Get timestamps temperature=0 # More deterministic results ) return { 'text': transcript.text, 'language': transcript.language, 'duration': transcript.duration, 'segments': transcript.segments if hasattr(transcript, 'segments') else [] } except Exception as e: return {'error': str(e)} def transcribe_with_timestamps(self, audio_file_path): """ Get transcription with word-level timestamps """ try: with open(audio_file_path, 'rb') as audio_file: transcript = self.client.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="verbose_json", timestamp_granularities=["word"] ) return { 'text': transcript.text, 'words': transcript.words if hasattr(transcript, 'words') else [] } except Exception as e: return {'error': str(e)} def process_voice_command(self, audio_file_path): """ Complete voice AI pipeline: transcribe + process + respond """ # Step 1: Transcribe audio transcription = self.transcribe_audio(audio_file_path) if 'error' in transcription: return transcription # Step 2: Process with GPT response = self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[ { "role": "system", "content": "You are a helpful voice assistant. Provide concise, helpful responses." }, { "role": "user", "content": transcription['text'] } ], max_tokens=150, temperature=0.7 ) return { 'transcription': transcription['text'], 'response': response.choices[0].message.content, 'language': transcription.get('language', 'unknown') } def generate_speech(self, text, voice="alloy"): """ Convert text to speech using OpenAI TTS """ try: response = self.client.audio.speech.create( model="tts-1", voice=voice, # alloy, echo, fable, onyx, nova, shimmer input=text, response_format="mp3" ) return response.content # Returns audio bytes except Exception as e: return {'error': str(e)} # Example usage if __name__ == "__main__": voice_ai = WhisperVoiceAI() # Transcribe audio file result = voice_ai.transcribe_audio("audio.wav") print("Transcription:", result) # Full voice AI processing ai_response = voice_ai.process_voice_command("audio.wav") print("AI Response:", ai_response) # Generate speech response if 'response' in ai_response: audio_bytes = voice_ai.generate_speech(ai_response['response']) # Save to file with open("response.mp3", "wb") as f: f.write(audio_bytes)

Real-time Audio Processing

Real-time Voice AI with PyAudio
import pyaudio import wave import threading import tempfile import os from whisper_voice_ai import WhisperVoiceAI class RealTimeVoiceAI: def __init(self): self.voice_ai = WhisperVoiceAI() self.is_recording = False self.audio_buffer = [] # Audio settings self.chunk = 1024 self.format = pyaudio.paInt16 self.channels = 1 self.rate = 16000 self.record_seconds = 3 # Process every 3 seconds # Initialize PyAudio self.audio = pyaudio.PyAudio() def start_listening(self): """Start continuous voice recognition""" self.is_recording = True stream = self.audio.open( format=self.format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk ) print("🎤 Voice AI started. Say something...") try: while self.is_recording: frames = [] # Record for specified duration for _ in range(0, int(self.rate / self.chunk * self.record_seconds)): if not self.is_recording: break data = stream.read(self.chunk) frames.append(data) if frames: # Save to temporary file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: wf = wave.open(temp_file.name, 'wb') wf.setnchannels(self.channels) wf.setsampwidth(self.audio.get_sample_size(self.format)) wf.setframerate(self.rate) wf.writeframes(b''.join(frames)) wf.close() # Process audio in separate thread threading.Thread( target=self.process_audio_chunk, args=(temp_file.name,) ).start() except KeyboardInterrupt: print("\n🛑 Stopping voice AI...") finally: stream.stop_stream() stream.close() self.audio.terminate() def process_audio_chunk(self, audio_file): """Process audio chunk and respond""" try: result = self.voice_ai.process_voice_command(audio_file) if 'error' not in result and result.get('transcription'): print(f"🗣️ You: {result['transcription']}") print(f"🤖 AI: {result['response']}") # Generate and play response if result.get('response'): audio_bytes = self.voice_ai.generate_speech(result['response']) if isinstance(audio_bytes, bytes): # Save and play response audio with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as response_file: response_file.write(audio_bytes) # Play audio (platform specific) self.play_audio(response_file.name) except Exception as e: print(f"❌ Processing error: {e}") finally: # Clean up temporary file if os.path.exists(audio_file): os.unlink(audio_file) def play_audio(self, audio_file): """Play audio file (cross-platform)""" import platform system = platform.system() if system == "Darwin": # macOS os.system(f"afplay {audio_file}") elif system == "Windows": os.system(f"start {audio_file}") else: # Linux os.system(f"mpg123 {audio_file}") def stop_listening(self): """Stop voice recognition""" self.is_recording = False # Usage if __name__ == "__main__": real_time_ai = RealTimeVoiceAI() try: real_time_ai.start_listening() except KeyboardInterrupt: real_time_ai.stop_listening()
.env Configuration
# .env file OPENAI_API_KEY=your_openai_api_key_here

Google Cloud Speech Setup

Google Cloud Speech-to-Text Integration

1

Setup Google Cloud Project

Create a project at console.cloud.google.com and enable the Speech-to-Text API.

2

Install SDK

Python: pip install google-cloud-speech google-cloud-texttospeech / Node.js: npm install @google-cloud/speech @google-cloud/text-to-speech

Python Implementation

Google Cloud Speech Python Client
import os import io from google.cloud import speech from google.cloud import texttospeech import pyaudio import threading import queue import time class GoogleVoiceAI: def __init__(self, credentials_path=None): # Set up authentication if credentials_path: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path self.speech_client = speech.SpeechClient() self.tts_client = texttospeech.TextToSpeechClient() # Audio configuration self.rate = 16000 self.chunk = int(self.rate / 10) # 100ms chunks def transcribe_file(self, audio_file_path, language_code="en-US"): """Transcribe audio file""" with io.open(audio_file_path, "rb") as audio_file: content = audio_file.read() audio = speech.RecognitionAudio(content=content) config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self.rate, language_code=language_code, enable_automatic_punctuation=True, enable_word_time_offsets=True, model="latest_long", # Use latest model ) response = self.speech_client.recognize(config=config, audio=audio) results = [] for result in response.results: alternative = result.alternatives[0] words = [] for word_info in alternative.words: word = word_info.word start_time = word_info.start_time.total_seconds() end_time = word_info.end_time.total_seconds() words.append({ 'word': word, 'start_time': start_time, 'end_time': end_time }) results.append({ 'transcript': alternative.transcript, 'confidence': alternative.confidence, 'words': words }) return results def streaming_recognize(self, audio_generator): """Real-time streaming speech recognition""" config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self.rate, language_code="en-US", enable_automatic_punctuation=True, ) streaming_config = speech.StreamingRecognitionConfig( config=config, interim_results=True, single_utterance=False, ) audio_requests = ( speech.StreamingRecognizeRequest(audio_content=chunk) for chunk in audio_generator ) requests = iter([ speech.StreamingRecognizeRequest(streaming_config=streaming_config), *audio_requests ]) responses = self.speech_client.streaming_recognize(requests) for response in responses: for result in response.results: if result.is_final: transcript = result.alternatives[0].transcript confidence = result.alternatives[0].confidence yield { 'transcript': transcript, 'confidence': confidence, 'is_final': True } else: # Interim result transcript = result.alternatives[0].transcript yield { 'transcript': transcript, 'confidence': 0, 'is_final': False } def generate_speech(self, text, language_code="en-US", voice_name="en-US-Standard-A"): """Generate speech from text""" synthesis_input = texttospeech.SynthesisInput(text=text) voice = texttospeech.VoiceSelectionParams( language_code=language_code, name=voice_name, ssml_gender=texttospeech.SsmlVoiceGender.FEMALE, ) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3 ) response = self.tts_client.synthesize_speech( input=synthesis_input, voice=voice, audio_config=audio_config ) return response.audio_content class GoogleStreamingVoiceAI: def __init__(self, credentials_path=None): self.voice_ai = GoogleVoiceAI(credentials_path) self.audio_queue = queue.Queue() self.is_recording = False # PyAudio setup self.audio = pyaudio.PyAudio() def audio_generator(self): """Generate audio chunks for streaming""" while self.is_recording: try: chunk = self.audio_queue.get(timeout=1) if chunk is None: break yield chunk except queue.Empty: continue def start_streaming(self): """Start streaming voice recognition""" self.is_recording = True # Start audio recording thread recording_thread = threading.Thread(target=self._record_audio) recording_thread.daemon = True recording_thread.start() # Start speech recognition try: print("🎤 Google Cloud streaming started...") for result in self.voice_ai.streaming_recognize(self.audio_generator()): if result['is_final'] and result['transcript'].strip(): print(f"🗣️ Final: {result['transcript']} (confidence: {result['confidence']:.2f})") # Process the final transcript response = self.process_command(result['transcript']) if response: print(f"🤖 Response: {response}") # Generate speech response audio_content = self.voice_ai.generate_speech(response) self.play_audio_content(audio_content) else: print(f"🔄 Interim: {result['transcript']}") except Exception as e: print(f"❌ Streaming error: {e}") finally: self.stop_streaming() def _record_audio(self): """Record audio and add to queue""" stream = self.audio.open( format=pyaudio.paInt16, channels=1, rate=self.voice_ai.rate, input=True, frames_per_buffer=self.voice_ai.chunk, ) try: while self.is_recording: data = stream.read(self.voice_ai.chunk) self.audio_queue.put(data) except Exception as e: print(f"❌ Recording error: {e}") finally: stream.stop_stream() stream.close() def process_command(self, transcript): """Process voice command and generate response""" transcript_lower = transcript.lower() if "hello" in transcript_lower or "hi" in transcript_lower: return "Hello! I'm your Google Cloud voice assistant. How can I help you?" elif "time" in transcript_lower: import datetime now = datetime.datetime.now() return f"The current time is {now.strftime('%I:%M %p')}" elif "weather" in transcript_lower: return "I'd need to integrate with a weather API to provide weather information." elif "stop" in transcript_lower or "quit" in transcript_lower: self.stop_streaming() return "Goodbye! Stopping the voice assistant." else: return f"I heard you say: {transcript}. How can I help with that?" def play_audio_content(self, audio_content): """Play audio content""" import tempfile import platform with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: temp_file.write(audio_content) temp_file_path = temp_file.name # Play audio based on platform system = platform.system() if system == "Darwin": # macOS os.system(f"afplay {temp_file_path}") elif system == "Windows": os.system(f"start {temp_file_path}") else: # Linux os.system(f"mpg123 {temp_file_path}") # Clean up time.sleep(1) # Give time for audio to start playing os.unlink(temp_file_path) def stop_streaming(self): """Stop streaming""" self.is_recording = False self.audio_queue.put(None) self.audio.terminate() # Usage example if __name__ == "__main__": # Set up credentials path credentials_path = "path/to/your/service-account-key.json" streaming_ai = GoogleStreamingVoiceAI(credentials_path) try: streaming_ai.start_streaming() except KeyboardInterrupt: print("\n🛑 Stopping...") streaming_ai.stop_streaming()

AWS Transcribe Setup

AWS Transcribe Integration

Enterprise-grade speech recognition with deep AWS integration, medical/legal variants and built-in PII redaction.

AWS Transcribe Python Implementation
import boto3 import json import time import asyncio import websockets import base64 from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from urllib.parse import urlencode class AWSVoiceAI: def __init__(self, region_name='us-east-1'): self.region = region_name self.transcribe = boto3.client('transcribe', region_name=region_name) self.polly = boto3.client('polly', region_name=region_name) self.s3 = boto3.client('s3', region_name=region_name) def transcribe_file(self, audio_file_path, job_name, language_code='en-US'): """Transcribe audio file using AWS Transcribe""" # Upload file to S3 first bucket_name = 'your-transcribe-bucket' # Create this bucket s3_key = f"audio/{job_name}.wav" try: # Upload to S3 self.s3.upload_file(audio_file_path, bucket_name, s3_key) media_uri = f"s3://{bucket_name}/{s3_key}" # Start transcription job response = self.transcribe.start_transcription_job( TranscriptionJobName=job_name, Media={'MediaFileUri': media_uri}, MediaFormat='wav', LanguageCode=language_code, Settings={ 'ShowSpeakerLabels': True, 'MaxSpeakerLabels': 2, 'ShowAlternatives': True, 'MaxAlternatives': 3 } ) # Wait for completion while True: status = self.transcribe.get_transcription_job( TranscriptionJobName=job_name ) job_status = status['TranscriptionJob']['TranscriptionJobStatus'] if job_status == 'COMPLETED': transcript_uri = status['TranscriptionJob']['Transcript']['TranscriptFileUri'] # Download and parse transcript import urllib.request with urllib.request.urlopen(transcript_uri) as response: transcript_data = json.loads(response.read()) return self._parse_transcript(transcript_data) elif job_status == 'FAILED': raise Exception(f"Transcription failed: {status['TranscriptionJob'].get('FailureReason')}") time.sleep(5) # Wait 5 seconds before checking again except Exception as e: return {'error': str(e)} def _parse_transcript(self, transcript_data): """Parse AWS Transcribe response""" results = transcript_data['results'] # Main transcript transcript = results['transcripts'][0]['transcript'] # Items with timestamps items = [] for item in results['items']: if item['type'] == 'pronunciation': items.append({ 'word': item['alternatives'][0]['content'], 'confidence': float(item['alternatives'][0]['confidence']), 'start_time': float(item.get('start_time', 0)), 'end_time': float(item.get('end_time', 0)) }) # Speaker labels if available speakers = [] if 'speaker_labels' in results: for segment in results['speaker_labels']['segments']: speakers.append({ 'speaker_label': segment['speaker_label'], 'start_time': float(segment['start_time']), 'end_time': float(segment['end_time']), 'items': segment['items'] }) return { 'transcript': transcript, 'items': items, 'speakers': speakers } def generate_speech(self, text, voice_id='Joanna'): """Generate speech using AWS Polly""" try: response = self.polly.synthesize_speech( Text=text, OutputFormat='mp3', VoiceId=voice_id, Engine='neural', # Use neural voices for better quality SampleRate='22050' ) return response['AudioStream'].read() except Exception as e: return {'error': str(e)} async def streaming_transcribe(self, audio_generator): """Real-time streaming transcription using WebSockets""" # AWS signature v4 for WebSocket authentication region = self.region service = 'transcribe' endpoint = f"transcribestreaming.{region}.amazonaws.com:8443" # Create signed WebSocket URL signed_url = self._create_signed_websocket_url( endpoint, region, service ) try: async with websockets.connect(signed_url) as websocket: # Send initial configuration config_message = { "MessageType": "TranscriptEvent", "Transcript": { "Results": [] } } await websocket.send(json.dumps(config_message)) # Start audio streaming async def send_audio(): for audio_chunk in audio_generator: if audio_chunk: # Encode audio chunk audio_event = { "MessageType": "AudioEvent", "AudioChunk": base64.b64encode(audio_chunk).decode('utf-8') } await websocket.send(json.dumps(audio_event)) await asyncio.sleep(0.1) # Start sending audio asyncio.create_task(send_audio()) # Receive transcription results async for message in websocket: data = json.loads(message) if data.get("MessageType") == "TranscriptEvent": results = data.get("Transcript", {}).get("Results", []) for result in results: if not result.get("IsPartial", True): # Final result transcript = "" for alternative in result.get("Alternatives", []): transcript += alternative.get("Transcript", "") if transcript.strip(): yield { 'transcript': transcript, 'is_final': True, 'confidence': result.get("Alternatives", [{}])[0].get("Confidence", 0) } except Exception as e: print(f"AWS Streaming error: {e}") def _create_signed_websocket_url(self, endpoint, region, service): """Create signed WebSocket URL for AWS Transcribe Streaming""" # This is a simplified version - in production, use boto3's built-in signing # Get AWS credentials session = boto3.Session() credentials = session.get_credentials() # Create WebSocket URL with signature # Note: This is a simplified example. Use aws-transcribe-streaming-sdk for production url = f"wss://{endpoint}/stream-transcription-websocket" # Add required parameters params = { 'language-code': 'en-US', 'media-encoding': 'pcm', 'sample-rate': '16000' } query_string = urlencode(params) return f"{url}?{query_string}" # Real-time streaming example class AWSStreamingVoiceAI: def __init__(self, region='us-east-1'): self.voice_ai = AWSVoiceAI(region) self.is_recording = False def start_streaming(self): """Start AWS streaming transcription""" import pyaudio import queue audio_queue = queue.Queue() # Audio settings CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 audio = pyaudio.PyAudio() def audio_generator(): while self.is_recording: try: chunk = audio_queue.get(timeout=1) yield chunk except queue.Empty: continue def record_audio(): stream = audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK ) try: while self.is_recording: data = stream.read(CHUNK) audio_queue.put(data) finally: stream.stop_stream() stream.close() audio.terminate() async def transcribe_stream(): async for result in self.voice_ai.streaming_transcribe(audio_generator()): if result['is_final'] and result['transcript'].strip(): print(f"🗣️ AWS: {result['transcript']} (confidence: {result['confidence']:.2f})") # Generate response response = self.process_command(result['transcript']) if response: print(f"🤖 Response: {response}") # Generate speech audio_content = self.voice_ai.generate_speech(response) if isinstance(audio_content, bytes): self.play_audio_content(audio_content) # Start recording and transcription self.is_recording = True import threading recording_thread = threading.Thread(target=record_audio) recording_thread.daemon = True recording_thread.start() # Run async transcription try: asyncio.run(transcribe_stream()) except KeyboardInterrupt: print("\n🛑 Stopping AWS Voice AI...") finally: self.is_recording = False def process_command(self, transcript): """Process voice command""" transcript_lower = transcript.lower() if "hello" in transcript_lower: return "Hello! I'm your AWS Transcribe voice assistant." elif "time" in transcript_lower: import datetime return f"The current time is {datetime.datetime.now().strftime('%I:%M %p')}" elif "stop" in transcript_lower: self.is_recording = False return "Stopping the voice assistant. Goodbye!" else: return f"I understood: {transcript}. How can I help you with that?" def play_audio_content(self, audio_content): """Play audio using system player""" import tempfile import os import platform with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: temp_file.write(audio_content) temp_file_path = temp_file.name system = platform.system() if system == "Darwin": os.system(f"afplay {temp_file_path}") elif system == "Windows": os.system(f"start {temp_file_path}") else: os.system(f"mpg123 {temp_file_path}") # Clean up after a delay import time time.sleep(1) os.unlink(temp_file_path) # Usage if __name__ == "__main__": # Make sure AWS credentials are configured # aws configure or set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY streaming_ai = AWSStreamingVoiceAI(region='us-east-1') streaming_ai.start_streaming()

Azure Speech Setup

Microsoft Azure Speech Integration

Azure provides custom neural voices, speaker recognition and deep Microsoft ecosystem integration.

Azure Speech Services Implementation
import azure.cognitiveservices.speech as speechsdk import threading import time import queue import pyaudio from dotenv import load_dotenv import os load_dotenv() class AzureVoiceAI: def __init__(self): # Set up Azure Speech configuration speech_key = os.getenv('AZURE_SPEECH_KEY') service_region = os.getenv('AZURE_SPEECH_REGION', 'eastus') self.speech_config = speechsdk.SpeechConfig( subscription=speech_key, region=service_region ) # Configure speech recognition self.speech_config.speech_recognition_language = "en-US" # Configure speech synthesis self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" # Audio configuration self.audio_config = speechsdk.audio.AudioConfig(use_default_microphone=True) # Initialize recognizer and synthesizer self.speech_recognizer = speechsdk.SpeechRecognizer( speech_config=self.speech_config, audio_config=self.audio_config ) self.speech_synthesizer = speechsdk.SpeechSynthesizer( speech_config=self.speech_config ) def recognize_once(self): """Single shot recognition""" print("🎤 Say something...") result = self.speech_recognizer.recognize_once() if result.reason == speechsdk.ResultReason.RecognizedSpeech: return { 'text': result.text, 'reason': 'recognized', 'confidence': getattr(result, 'confidence', None) } elif result.reason == speechsdk.ResultReason.NoMatch: return { 'text': '', 'reason': 'no_match', 'details': result.no_match_details } elif result.reason == speechsdk.ResultReason.Canceled: return { 'text': '', 'reason': 'canceled', 'error': result.cancellation_details.reason } def continuous_recognition(self, callback_function=None): """Continuous speech recognition""" def recognized_handler(evt): if evt.result.text and callback_function: callback_function({ 'text': evt.result.text, 'reason': 'final', 'confidence': getattr(evt.result, 'confidence', None) }) def recognizing_handler(evt): if evt.result.text and callback_function: callback_function({ 'text': evt.result.text, 'reason': 'partial', 'confidence': None }) # Connect callbacks self.speech_recognizer.recognized.connect(recognized_handler) self.speech_recognizer.recognizing.connect(recognizing_handler) # Start continuous recognition self.speech_recognizer.start_continuous_recognition() print("🎤 Continuous recognition started. Press Ctrl+C to stop...") try: while True: time.sleep(0.1) except KeyboardInterrupt: print("\n🛑 Stopping recognition...") finally: self.speech_recognizer.stop_continuous_recognition() def speak_text(self, text): """Convert text to speech""" try: result = self.speech_synthesizer.speak_text_async(text).get() if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: return {'status': 'success', 'message': f'Speech synthesized for text: {text}'} elif result.reason == speechsdk.ResultReason.Canceled: return { 'status': 'error', 'message': f'Speech synthesis canceled: {result.cancellation_details.reason}' } except Exception as e: return {'status': 'error', 'message': str(e)} def speak_ssml(self, ssml_text): """Speak using SSML for advanced control""" try: result = self.speech_synthesizer.speak_ssml_async(ssml_text).get() if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: return {'status': 'success', 'message': 'SSML speech synthesized'} else: return {'status': 'error', 'message': 'SSML synthesis failed'} except Exception as e: return {'status': 'error', 'message': str(e)} def get_available_voices(self): """Get list of available voices""" try: synthesizer = speechsdk.SpeechSynthesizer( speech_config=self.speech_config, audio_config=None ) result = synthesizer.get_voices_async().get() voices = [] for voice in result.voices: voices.append({ 'name': voice.name, 'display_name': voice.display_name, 'local_name': voice.local_name, 'gender': str(voice.gender), 'locale': voice.locale, 'voice_type': str(voice.voice_type) }) return voices except Exception as e: return {'error': str(e)} class AzureConversationalAI: def __init__(self): self.voice_ai = AzureVoiceAI() self.conversation_active = False def start_conversation(self): """Start interactive voice conversation""" print("🤖 Azure Voice AI Assistant started!") print("💬 Try saying: 'Hello', 'What time is it?', 'Tell me a joke', or 'Goodbye'") def handle_speech_result(result): if result['reason'] == 'final' and result['text'].strip(): print(f"\n🗣️ You: {result['text']}") # Process the command response = self.process_voice_command(result['text']) if response: print(f"🤖 AI: {response}") # Speak the response self.voice_ai.speak_text(response) # Check for exit command if "goodbye" in result['text'].lower() or "exit" in result['text'].lower(): self.conversation_active = False return self.conversation_active = True try: self.voice_ai.continuous_recognition(callback_function=handle_speech_result) except KeyboardInterrupt: print("\n🛑 Conversation ended") finally: self.conversation_active = False def process_voice_command(self, text): """Process voice commands and generate responses""" text_lower = text.lower().strip() if any(greeting in text_lower for greeting in ['hello', 'hi', 'hey']): return "Hello! I'm your Azure voice assistant. How can I help you today?" elif 'time' in text_lower: import datetime current_time = datetime.datetime.now() return f"The current time is {current_time.strftime('%I:%M %p on %B %d, %Y')}" elif 'date' in text_lower: import datetime current_date = datetime.datetime.now() return f"Today is {current_date.strftime('%A, %B %d, %Y')}" elif 'weather' in text_lower: return "I don't have access to weather data right now, but I'd be happy to help you find a weather service!" elif 'joke' in text_lower: jokes = [ "Why don't scientists trust atoms? Because they make up everything!", "Why did the robot go to therapy? It had too many bugs in its system!", "What do you call a fake noodle? An impasta!", "Why don't programmers like nature? It has too many bugs!" ] import random return random.choice(jokes) elif 'name' in text_lower: return "I'm your Azure-powered voice assistant. You can call me Azure AI!" elif any(farewell in text_lower for farewell in ['goodbye', 'bye', 'exit', 'quit']): return "Goodbye! It was nice talking with you. Have a great day!" elif 'help' in text_lower: return ("I can help you with various tasks. Try asking me about the time, " "date, tell you a joke, or just have a conversation. Say 'goodbye' when you're done!") else: # Echo back what was heard with a helpful response return f"I heard you say: '{text}'. I'm still learning, but I'm here to help! Try asking me about the time, for a joke, or just say hello!" def demo_advanced_features(self): """Demonstrate advanced Azure Speech features""" print("🎙️ Azure Speech Advanced Features Demo") # 1. List available voices print("\n1. Available Voices:") voices = self.voice_ai.get_available_voices() if 'error' not in voices: neural_voices = [v for v in voices if 'Neural' in v['name']][:5] for voice in neural_voices: print(f" - {voice['display_name']} ({voice['locale']})") # 2. SSML Example print("\n2. SSML (Speech Synthesis Markup Language) Demo:") ssml_text = """ <speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis" xml:lang="en-US"> <voice name="en-US-JennyNeural"> <prosody rate="slow" pitch="low"> Hello, this is a demonstration of SSML. </prosody> <break time="1s"/> <prosody rate="fast" pitch="high"> I can change my speaking rate and pitch! </prosody> <break time="500ms"/> <emphasis level="strong"> And I can add emphasis to important words! </emphasis> </voice> </speak> """ result = self.voice_ai.speak_ssml(ssml_text) print(f" SSML Result: {result['status']}") # 3. Single recognition demo print("\n3. Single Recognition Demo:") print(" Say something (one phrase)...") result = self.voice_ai.recognize_once() print(f" Recognized: {result['text']} (Reason: {result['reason']})") # Usage examples if __name__ == "__main__": # Make sure to set these environment variables: # AZURE_SPEECH_KEY=your_azure_speech_key # AZURE_SPEECH_REGION=your_azure_region (e.g., eastus) try: # Create conversational AI conv_ai = AzureConversationalAI() # Run advanced features demo print("=== Azure Speech Services Demo ===") conv_ai.demo_advanced_features() print("\n" + "="*50) # Start interactive conversation conv_ai.start_conversation() except Exception as e: print(f"❌ Error initializing Azure Voice AI: {e}") print("💡 Make sure you have set AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environment variables")
.env Configuration
# .env file AZURE_SPEECH_KEY=your_azure_speech_key_here AZURE_SPEECH_REGION=eastus

Production Voice AI Architecture

A production-ready system needs multi-provider failover, WebSocket streaming and connection management. Here's a complete implementation:

Multi-Provider Voice AI Manager

Production Voice AI Manager
import asyncio import logging from abc import ABC, abstractmethod from enum import Enum from dataclasses import dataclass from typing import Optional, List, Dict, Any import time import json class VoiceProvider(Enum): OPENAI = "openai" GOOGLE = "google" AWS = "aws" AZURE = "azure" @dataclass class VoiceResult: text: str confidence: float provider: VoiceProvider processing_time: float is_final: bool = True error: Optional[str] = None class VoiceAIProvider(ABC): """Abstract base class for voice AI providers""" @abstractmethod async def transcribe_audio(self, audio_data: bytes) -> VoiceResult: pass @abstractmethod async def generate_speech(self, text: str) -> bytes: pass @abstractmethod def is_available(self) -> bool: pass class OpenAIProvider(VoiceAIProvider): def __init__(self, api_key: str): self.api_key = api_key self.client = None # Initialize OpenAI client async def transcribe_audio(self, audio_data: bytes) -> VoiceResult: start_time = time.time() try: # OpenAI Whisper implementation # (Use the previous OpenAI code here) processing_time = time.time() - start_time return VoiceResult( text="transcribed_text", # Replace with actual result confidence=0.95, provider=VoiceProvider.OPENAI, processing_time=processing_time ) except Exception as e: return VoiceResult( text="", confidence=0.0, provider=VoiceProvider.OPENAI, processing_time=time.time() - start_time, error=str(e) ) async def generate_speech(self, text: str) -> bytes: # OpenAI TTS implementation return b"audio_data" def is_available(self) -> bool: return self.api_key is not None class MultiProviderVoiceAI: """Voice AI system with multiple provider support and fallback""" def __init__(self): self.providers: Dict[VoiceProvider, VoiceAIProvider] = {} self.primary_provider = VoiceProvider.OPENAI self.fallback_order = [ VoiceProvider.OPENAI, VoiceProvider.GOOGLE, VoiceProvider.AZURE, VoiceProvider.AWS ] # Performance tracking self.metrics = { 'total_requests': 0, 'successful_requests': 0, 'provider_usage': {provider: 0 for provider in VoiceProvider}, 'average_latency': {}, 'error_count': {provider: 0 for provider in VoiceProvider} } # Setup logging self.logger = logging.getLogger(__name__) def add_provider(self, provider: VoiceProvider, instance: VoiceAIProvider): """Add a voice AI provider""" self.providers[provider] = instance self.logger.info(f"Added provider: {provider.value}") async def transcribe_with_fallback(self, audio_data: bytes) -> VoiceResult: """Transcribe audio with automatic provider fallback""" self.metrics['total_requests'] += 1 for provider_type in self.fallback_order: if provider_type not in self.providers: continue provider = self.providers[provider_type] if not provider.is_available(): self.logger.warning(f"Provider {provider_type.value} is not available") continue try: self.logger.info(f"Attempting transcription with {provider_type.value}") result = await provider.transcribe_audio(audio_data) if result.error is None: # Success self.metrics['successful_requests'] += 1 self.metrics['provider_usage'][provider_type] += 1 self._update_latency_metrics(provider_type, result.processing_time) self.logger.info( f"Transcription successful with {provider_type.value} " f"(confidence: {result.confidence:.2f}, " f"time: {result.processing_time:.2f}s)" ) return result else: self.metrics['error_count'][provider_type] += 1 self.logger.error(f"{provider_type.value} error: {result.error}") except Exception as e: self.metrics['error_count'][provider_type] += 1 self.logger.error(f"Provider {provider_type.value} failed: {e}") continue # All providers failed self.logger.error("All voice providers failed") return VoiceResult( text="", confidence=0.0, provider=VoiceProvider.OPENAI, # Default processing_time=0.0, error="All providers failed" ) async def generate_speech_with_fallback(self, text: str) -> Optional[bytes]: """Generate speech with provider fallback""" for provider_type in self.fallback_order: if provider_type not in self.providers: continue provider = self.providers[provider_type] if not provider.is_available(): continue try: audio_data = await provider.generate_speech(text) self.logger.info(f"Speech generated successfully with {provider_type.value}") return audio_data except Exception as e: self.logger.error(f"Speech generation failed with {provider_type.value}: {e}") continue self.logger.error("All speech providers failed") return None def _update_latency_metrics(self, provider: VoiceProvider, latency: float): """Update average latency metrics""" if provider not in self.metrics['average_latency']: self.metrics['average_latency'][provider] = [] # Keep last 100 measurements for rolling average self.metrics['average_latency'][provider].append(latency) if len(self.metrics['average_latency'][provider]) > 100: self.metrics['average_latency'][provider].pop(0) def get_metrics(self) -> Dict[str, Any]: """Get performance metrics""" metrics = self.metrics.copy() # Calculate average latencies for provider, latencies in self.metrics['average_latency'].items(): if latencies: metrics[f'avg_latency_{provider.value}'] = sum(latencies) / len(latencies) # Calculate success rate if self.metrics['total_requests'] > 0: metrics['success_rate'] = ( self.metrics['successful_requests'] / self.metrics['total_requests'] ) * 100 return metrics def get_health_status(self) -> Dict[str, Any]: """Get system health status""" status = { 'overall_health': 'healthy', 'providers': {}, 'total_providers': len(self.providers), 'available_providers': 0 } for provider_type, provider in self.providers.items(): is_available = provider.is_available() if is_available: status['available_providers'] += 1 status['providers'][provider_type.value] = { 'available': is_available, 'error_count': self.metrics['error_count'][provider_type], 'usage_count': self.metrics['provider_usage'][provider_type] } # Determine overall health if status['available_providers'] == 0: status['overall_health'] = 'critical' elif status['available_providers'] < len(self.providers) / 2: status['overall_health'] = 'degraded' return status class VoiceAIWebSocketHandler: """WebSocket handler for real-time voice AI""" def __init__(self, voice_ai: MultiProviderVoiceAI): self.voice_ai = voice_ai self.active_sessions: Dict[str, Dict] = {} self.logger = logging.getLogger(__name__) async def handle_connection(self, websocket, path): """Handle WebSocket connection""" session_id = self._generate_session_id() self.active_sessions[session_id] = { 'websocket': websocket, 'created_at': time.time(), 'audio_buffer': b'', 'context': {} } self.logger.info(f"New voice session: {session_id}") try: await self._session_loop(session_id) except Exception as e: self.logger.error(f"Session {session_id} error: {e}") finally: if session_id in self.active_sessions: del self.active_sessions[session_id] self.logger.info(f"Session ended: {session_id}") async def _session_loop(self, session_id: str): """Main session processing loop""" session = self.active_sessions[session_id] websocket = session['websocket'] async for message in websocket: try: if isinstance(message, bytes): # Audio data await self._process_audio_data(session_id, message) else: # Text message (JSON) data = json.loads(message) await self._process_text_message(session_id, data) except Exception as e: self.logger.error(f"Message processing error: {e}") await websocket.send(json.dumps({ 'type': 'error', 'message': str(e) })) async def _process_audio_data(self, session_id: str, audio_data: bytes): """Process incoming audio data""" session = self.active_sessions[session_id] # Add to buffer session['audio_buffer'] += audio_data # Process when buffer reaches threshold (e.g., 3 seconds of audio) if len(session['audio_buffer']) >= 48000: # 3 sec at 16kHz 16-bit mono result = await self.voice_ai.transcribe_with_fallback(session['audio_buffer']) # Send result back to client await session['websocket'].send(json.dumps({ 'type': 'transcription', 'text': result.text, 'confidence': result.confidence, 'provider': result.provider.value, 'processing_time': result.processing_time, 'is_final': result.is_final })) # Clear buffer session['audio_buffer'] = b'' # Generate response if transcription was successful if result.text and not result.error: response_text = await self._generate_response(result.text, session['context']) if response_text: # Send text response await session['websocket'].send(json.dumps({ 'type': 'response', 'text': response_text })) # Generate and send audio response audio_response = await self.voice_ai.generate_speech_with_fallback(response_text) if audio_response: await session['websocket'].send(audio_response) async def _process_text_message(self, session_id: str, data: Dict): """Process text-based messages""" session = self.active_sessions[session_id] if data.get('type') == 'config': # Update session configuration session['context'].update(data.get('config', {})) await session['websocket'].send(json.dumps({ 'type': 'config_updated', 'status': 'success' })) elif data.get('type') == 'metrics_request': # Send metrics metrics = self.voice_ai.get_metrics() await session['websocket'].send(json.dumps({ 'type': 'metrics', 'data': metrics })) async def _generate_response(self, text: str, context: Dict) -> Optional[str]: """Generate AI response based on input text and context""" # This would typically integrate with your AI/NLP service # For now, simple keyword-based responses text_lower = text.lower() if 'hello' in text_lower or 'hi' in text_lower: return "Hello! I'm your voice assistant. How can I help you?" elif 'time' in text_lower: import datetime return f"The current time is {datetime.datetime.now().strftime('%I:%M %p')}" elif 'goodbye' in text_lower or 'bye' in text_lower: return "Goodbye! Have a great day!" else: return f"I heard you say: {text}. How can I help you with that?" def _generate_session_id(self) -> str: """Generate unique session ID""" import uuid return str(uuid.uuid4())[:8] # Usage example async def main(): """Example usage of the multi-provider voice AI system""" # Initialize the multi-provider system voice_ai = MultiProviderVoiceAI() # Add providers (you would initialize these with real credentials) # voice_ai.add_provider(VoiceProvider.OPENAI, OpenAIProvider(api_key="...")) # voice_ai.add_provider(VoiceProvider.GOOGLE, GoogleProvider(credentials="...")) # Test transcription with fallback # audio_data = b"..." # Your audio data # result = await voice_ai.transcribe_with_fallback(audio_data) # print(f"Result: {result.text} (Provider: {result.provider.value})") # Get metrics metrics = voice_ai.get_metrics() print("Performance Metrics:", json.dumps(metrics, indent=2)) # Get health status health = voice_ai.get_health_status() print("Health Status:", json.dumps(health, indent=2)) if __name__ == "__main__": asyncio.run(main())

WebSocket Client

JavaScript WebSocket Voice Client
class VoiceAIClient { constructor(websocketUrl) { this.websocketUrl = websocketUrl; this.websocket = null; this.mediaRecorder = null; this.audioContext = null; this.isRecording = false; // Event handlers this.onTranscription = null; this.onResponse = null; this.onAudioResponse = null; this.onError = null; } async connect() { return new Promise((resolve, reject) => { this.websocket = new WebSocket(this.websocketUrl); this.websocket.onopen = () => { console.log('🔗 Connected to Voice AI WebSocket'); resolve(); }; this.websocket.onmessage = (event) => { this.handleMessage(event); }; this.websocket.onerror = (error) => { console.error('❌ WebSocket error:', error); if (this.onError) this.onError(error); reject(error); }; this.websocket.onclose = () => { console.log('🔌 WebSocket connection closed'); }; }); } handleMessage(event) { if (event.data instanceof Blob) { // Audio response if (this.onAudioResponse) { this.onAudioResponse(event.data); } return; } try { const data = JSON.parse(event.data); switch (data.type) { case 'transcription': if (this.onTranscription) { this.onTranscription(data); } console.log(`🗣️ Transcription: ${data.text} (${data.provider})`); break; case 'response': if (this.onResponse) { this.onResponse(data); } console.log(`🤖 Response: ${data.text}`); break; case 'error': console.error('❌ Server error:', data.message); if (this.onError) this.onError(data); break; case 'metrics': console.log('📊 Metrics:', data.data); break; } } catch (e) { console.error('❌ Failed to parse message:', e); } } async startRecording() { if (this.isRecording) return; try { // Get microphone access const stream = await navigator.mediaDevices.getUserMedia({ audio: { sampleRate: 16000, channelCount: 1, echoCancellation: true, noiseSuppression: true } }); // Setup audio context for processing this.audioContext = new (window.AudioContext || window.webkitAudioContext)({ sampleRate: 16000 }); const source = this.audioContext.createMediaStreamSource(stream); const processor = this.audioContext.createScriptProcessor(4096, 1, 1); processor.onaudioprocess = (event) => { if (this.isRecording && this.websocket.readyState === WebSocket.OPEN) { const inputBuffer = event.inputBuffer.getChannelData(0); // Convert float32 to int16 const pcmData = new Int16Array(inputBuffer.length); for (let i = 0; i < inputBuffer.length; i++) { pcmData[i] = Math.max(-32768, Math.min(32767, inputBuffer[i] * 32768)); } // Send audio data to server this.websocket.send(pcmData.buffer); } }; source.connect(processor); processor.connect(this.audioContext.destination); this.isRecording = true; console.log('🎤 Recording started'); } catch (error) { console.error('❌ Failed to start recording:', error); if (this.onError) this.onError(error); } } stopRecording() { if (!this.isRecording) return; this.isRecording = false; if (this.audioContext) { this.audioContext.close(); this.audioContext = null; } console.log('🛑 Recording stopped'); } sendConfig(config) { if (this.websocket && this.websocket.readyState === WebSocket.OPEN) { this.websocket.send(JSON.stringify({ type: 'config', config: config })); } } requestMetrics() { if (this.websocket && this.websocket.readyState === WebSocket.OPEN) { this.websocket.send(JSON.stringify({ type: 'metrics_request' })); } } playAudioResponse(audioBlob) { const audioUrl = URL.createObjectURL(audioBlob); const audio = new Audio(audioUrl); audio.onended = () => { URL.revokeObjectURL(audioUrl); }; audio.play().catch(error => { console.error('❌ Failed to play audio:', error); }); } disconnect() { this.stopRecording(); if (this.websocket) { this.websocket.close(); this.websocket = null; } } } // Usage example const voiceClient = new VoiceAIClient('ws://localhost:8080/voice'); // Set up event handlers voiceClient.onTranscription = (data) => { document.getElementById('transcription').textContent = data.text; document.getElementById('confidence').textContent = `${(data.confidence * 100).toFixed(1)}%`; }; voiceClient.onResponse = (data) => { document.getElementById('response').textContent = data.text; }; voiceClient.onAudioResponse = (audioBlob) => { voiceClient.playAudioResponse(audioBlob); }; voiceClient.onError = (error) => { console.error('Voice AI Error:', error); }; // Connect and start async function startVoiceAI() { try { await voiceClient.connect(); await voiceClient.startRecording(); // Send initial configuration voiceClient.sendConfig({ language: 'en-US', model: 'latest', enableProfanityFilter: true }); } catch (error) { console.error('Failed to start Voice AI:', error); } } // HTML interface document.addEventListener('DOMContentLoaded', () => { const startBtn = document.getElementById('startVoiceAI'); const stopBtn = document.getElementById('stopVoiceAI'); const metricsBtn = document.getElementById('getMetrics'); startBtn.addEventListener('click', startVoiceAI); stopBtn.addEventListener('click', () => { voiceClient.stopRecording(); }); metricsBtn.addEventListener('click', () => { voiceClient.requestMetrics(); }); });

Troubleshooting

Audio Quality Issues

Symptoms: Poor recognition accuracy, garbled output, echo.
Common causes: Wrong sample rate, incorrect encoding, noisy input, clipping.
Fixes: Use 16 kHz / 16-bit PCM. Apply noise gate. Check microphone gain. Add echo cancellation.
Audio Quality Analyzer
import numpy as np import librosa import matplotlib.pyplot as plt from scipy import signal class AudioQualityAnalyzer: def __init__(self): self.sample_rate = 16000 def analyze_audio_file(self, audio_file_path): """Comprehensive audio quality analysis""" # Load audio audio_data, sr = librosa.load(audio_file_path, sr=self.sample_rate) analysis = { 'file_path': audio_file_path, 'duration': len(audio_data) / sr, 'sample_rate': sr, 'channels': 1, # librosa loads as mono by default 'bit_depth': '32-bit float (loaded)', 'file_size_mb': len(audio_data) * 4 / (1024 * 1024) # 4 bytes per float32 } # Signal quality metrics analysis.update(self._analyze_signal_quality(audio_data, sr)) # Frequency analysis analysis.update(self._analyze_frequency_content(audio_data, sr)) # Voice activity detection analysis.update(self._detect_voice_activity(audio_data, sr)) # Recommendations analysis['recommendations'] = self._generate_recommendations(analysis) return analysis def _analyze_signal_quality(self, audio_data, sr): """Analyze basic signal quality metrics""" # RMS (Root Mean Square) - overall loudness rms = np.sqrt(np.mean(audio_data**2)) # Peak amplitude peak = np.max(np.abs(audio_data)) # Dynamic range dynamic_range = 20 * np.log10(peak / (rms + 1e-10)) # Signal-to-Noise Ratio estimation # Find quiet segments (bottom 10% of RMS values) frame_length = int(0.025 * sr) # 25ms frames hop_length = int(0.010 * sr) # 10ms hop frames = librosa.frame(audio_data, frame_length=frame_length, hop_length=hop_length) frame_rms = np.sqrt(np.mean(frames**2, axis=0)) noise_threshold = np.percentile(frame_rms, 10) signal_power = np.mean(frame_rms[frame_rms > noise_threshold]**2) noise_power = np.mean(frame_rms[frame_rms <= noise_threshold]**2) snr = 10 * np.log10(signal_power / (noise_power + 1e-10)) # Clipping detection clipping_threshold = 0.95 clipped_samples = np.sum(np.abs(audio_data) > clipping_threshold) clipping_percentage = (clipped_samples / len(audio_data)) * 100 return { 'rms_level': float(rms), 'peak_amplitude': float(peak), 'dynamic_range_db': float(dynamic_range), 'estimated_snr_db': float(snr), 'clipping_percentage': float(clipping_percentage), 'is_clipped': clipping_percentage > 0.1 } def _analyze_frequency_content(self, audio_data, sr): """Analyze frequency content for speech optimization""" # Compute power spectral density frequencies, psd = signal.welch(audio_data, sr, nperseg=1024) # Key frequency ranges for speech ranges = { 'fundamental_freq': (80, 300), # Fundamental frequency range 'formant_range': (300, 3400), # Main formant range 'consonant_range': (2000, 8000), # Consonant clarity range 'full_bandwidth': (20, 8000) # Full audio bandwidth } frequency_analysis = {} for range_name, (low_freq, high_freq) in ranges.items(): # Find frequency indices low_idx = np.argmin(np.abs(frequencies - low_freq)) high_idx = np.argmin(np.abs(frequencies - high_freq)) # Calculate average power in range avg_power = np.mean(psd[low_idx:high_idx]) frequency_analysis[f'{range_name}_power'] = float(avg_power) # Spectral centroid (brightness) spectral_centroid = np.sum(frequencies * psd) / np.sum(psd) frequency_analysis['spectral_centroid_hz'] = float(spectral_centroid) # Bandwidth (spectral spread) spectral_spread = np.sqrt(np.sum(((frequencies - spectral_centroid) ** 2) * psd) / np.sum(psd)) frequency_analysis['spectral_spread_hz'] = float(spectral_spread) return frequency_analysis def _detect_voice_activity(self, audio_data, sr): """Detect voice activity and speech characteristics""" # Frame the audio frame_length = int(0.025 * sr) # 25ms hop_length = int(0.010 * sr) # 10ms # Energy-based VAD frames = librosa.frame(audio_data, frame_length=frame_length, hop_length=hop_length) frame_energy = np.sum(frames**2, axis=0) # Adaptive threshold energy_threshold = np.percentile(frame_energy, 30) # Bottom 30% as silence voice_frames = frame_energy > energy_threshold # Speech statistics total_frames = len(voice_frames) speech_frames = np.sum(voice_frames) speech_ratio = speech_frames / total_frames if total_frames > 0 else 0 # Find speech segments speech_segments = [] in_speech = False segment_start = 0 for i, is_voice in enumerate(voice_frames): if is_voice and not in_speech: # Start of speech segment segment_start = i * hop_length / sr in_speech = True elif not is_voice and in_speech: # End of speech segment segment_end = i * hop_length / sr speech_segments.append((segment_start, segment_end)) in_speech = False return { 'speech_ratio': float(speech_ratio), 'total_speech_segments': len(speech_segments), 'average_segment_length': float(np.mean([end - start for start, end in speech_segments])) if speech_segments else 0, 'silence_ratio': float(1 - speech_ratio), 'voice_activity_detected': speech_ratio > 0.1 } def _generate_recommendations(self, analysis): """Generate optimization recommendations based on analysis""" recommendations = [] # Audio level recommendations if analysis['rms_level'] < 0.01: recommendations.append("⚠️ Audio level too low - increase microphone gain or speak louder") elif analysis['rms_level'] > 0.5: recommendations.append("⚠️ Audio level too high - reduce microphone gain to prevent clipping") # Clipping check if analysis['is_clipped']: recommendations.append("❌ Audio clipping detected - reduce input gain immediately") # SNR recommendations if analysis['estimated_snr_db'] < 10: recommendations.append("🔇 Low signal-to-noise ratio - use noise cancellation or quieter environment") elif analysis['estimated_snr_db'] > 25: recommendations.append("✅ Excellent signal-to-noise ratio") # Speech content if not analysis['voice_activity_detected']: recommendations.append("❌ No speech detected - check microphone and speak clearly") elif analysis['speech_ratio'] < 0.3: recommendations.append("⚠️ Low speech content - too much silence in recording") # Frequency content if analysis['formant_range_power'] < analysis['fundamental_freq_power'] * 0.1: recommendations.append("⚠️ Weak formant frequencies - may affect speech recognition") # File quality if analysis['duration'] < 1.0: recommendations.append("⚠️ Very short audio clip - longer samples improve accuracy") elif analysis['duration'] > 30.0: recommendations.append("💡 Long audio clip - consider chunking for better real-time performance") if not recommendations: recommendations.append("✅ Audio quality looks good for speech recognition") return recommendations def plot_analysis(self, analysis, audio_file_path): """Create visualization plots for audio analysis""" # Load audio for plotting audio_data, sr = librosa.load(audio_file_path, sr=self.sample_rate) fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle(f'Audio Analysis: {audio_file_path}', fontsize=16) # Time domain plot time = np.linspace(0, len(audio_data) / sr, len(audio_data)) axes[0, 0].plot(time, audio_data) axes[0, 0].set_title('Waveform') axes[0, 0].set_xlabel('Time (s)') axes[0, 0].set_ylabel('Amplitude') axes[0, 0].grid(True) # Frequency spectrum frequencies, psd = signal.welch(audio_data, sr, nperseg=1024) axes[0, 1].semilogx(frequencies, 10 * np.log10(psd)) axes[0, 1].set_title('Power Spectral Density') axes[0, 1].set_xlabel('Frequency (Hz)') axes[0, 1].set_ylabel('Power (dB)') axes[0, 1].grid(True) # Spectrogram f, t, Sxx = signal.spectrogram(audio_data, sr) axes[1, 0].pcolormesh(t, f, 10 * np.log10(Sxx + 1e-10)) axes[1, 0].set_title('Spectrogram') axes[1, 0].set_xlabel('Time (s)') axes[1, 0].set_ylabel('Frequency (Hz)') # Quality metrics bar chart metrics = { 'RMS Level': analysis['rms_level'], 'Peak Amp': analysis['peak_amplitude'], 'SNR (dB)': analysis['estimated_snr_db'] / 30, # Normalize for display 'Speech Ratio': analysis['speech_ratio'] } bars = axes[1, 1].bar(metrics.keys(), metrics.values()) axes[1, 1].set_title('Quality Metrics') axes[1, 1].set_ylabel('Normalized Value') axes[1, 1].tick_params(axis='x', rotation=45) # Color code bars colors = ['red' if v < 0.3 else 'orange' if v < 0.7 else 'green' for v in metrics.values()] for bar, color in zip(bars, colors): bar.set_color(color) plt.tight_layout() plt.show() return fig # Usage example if __name__ == "__main__": analyzer = AudioQualityAnalyzer() # Analyze audio file analysis = analyzer.analyze_audio_file("test_audio.wav") # Print analysis results print("=== AUDIO QUALITY ANALYSIS ===") print(f"Duration: {analysis['duration']:.2f}s") print(f"RMS Level: {analysis['rms_level']:.4f}") print(f"Peak Amplitude: {analysis['peak_amplitude']:.4f}") print(f"SNR: {analysis['estimated_snr_db']:.1f} dB") print(f"Speech Ratio: {analysis['speech_ratio']:.1%}") print(f"Clipping: {'Yes' if analysis['is_clipped'] else 'No'}") print("\n=== RECOMMENDATIONS ===") for rec in analysis['recommendations']: print(rec) # Create plots analyzer.plot_analysis(analysis, "test_audio.wav")

API Integration Issues

Rate Limiting & Retry Logic
import asyncio import time import random from typing import Optional, Callable, Any class RateLimiter: def __init__(self, max_requests_per_minute: int = 60): self.max_requests = max_requests_per_minute self.requests = [] self.lock = asyncio.Lock() async def acquire(self): async with self.lock: now = time.time() # Remove requests older than 1 minute self.requests = [req_time for req_time in self.requests if now - req_time < 60] if len(self.requests) >= self.max_requests: # Wait until we can make another request sleep_time = 60 - (now - self.requests[0]) if sleep_time > 0: await asyncio.sleep(sleep_time) return await self.acquire() self.requests.append(now) class RetryHandler: def __init__(self, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0): self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay async def execute_with_retry(self, func: Callable, *args, **kwargs) -> Any: """Execute function with exponential backoff retry logic""" last_exception = None for attempt in range(self.max_retries + 1): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt == self.max_retries: # Final attempt failed break # Calculate delay with exponential backoff and jitter delay = min(self.base_delay * (2 ** attempt), self.max_delay) jitter = random.uniform(0, 0.1) * delay # Add up to 10% jitter total_delay = delay + jitter print(f"⚠️ Attempt {attempt + 1} failed: {e}") print(f"🔄 Retrying in {total_delay:.1f} seconds...") await asyncio.sleep(total_delay) # All retries exhausted raise last_exception class RobustVoiceAI: def __init__(self): self.rate_limiter = RateLimiter(max_requests_per_minute=50) self.retry_handler = RetryHandler(max_retries=3) self.providers = [] # List of voice AI providers self.current_provider_index = 0 async def transcribe_with_resilience(self, audio_data: bytes) -> Optional[str]: """Transcribe with rate limiting, retries, and provider failover""" # Apply rate limiting await self.rate_limiter.acquire() # Try each provider for provider_attempt in range(len(self.providers)): current_provider = self.providers[self.current_provider_index] try: print(f"🔄 Using provider: {current_provider.name}") # Execute with retry logic result = await self.retry_handler.execute_with_retry( self._transcribe_with_provider, current_provider, audio_data ) return result except Exception as e: print(f"❌ Provider {current_provider.name} failed: {e}") # Switch to next provider self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) # If this was the last provider, give up if provider_attempt == len(self.providers) - 1: print("❌ All providers failed") raise e return None async def _transcribe_with_provider(self, provider, audio_data: bytes) -> str: """Internal method to transcribe with specific provider""" # This would call the actual provider's API # Implementation depends on the provider (OpenAI, Google, etc.) pass # Network troubleshooting utilities class NetworkDiagnostics: @staticmethod async def test_connectivity(urls: list) -> dict: """Test connectivity to voice AI service endpoints""" import aiohttp results = {} async with aiohttp.ClientSession() as session: for url in urls: try: start_time = time.time() async with session.get(url, timeout=10) as response: latency = (time.time() - start_time) * 1000 results[url] = { 'status': 'success', 'status_code': response.status, 'latency_ms': round(latency, 2) } except Exception as e: results[url] = { 'status': 'failed', 'error': str(e) } return results @staticmethod def diagnose_audio_issues(): """Diagnose common audio setup issues""" issues = [] recommendations = [] # Check PyAudio installation try: import pyaudio pa = pyaudio.PyAudio() # Check for input devices input_devices = [] for i in range(pa.get_device_count()): device_info = pa.get_device_info_by_index(i) if device_info['maxInputChannels'] > 0: input_devices.append(device_info) if not input_devices: issues.append("❌ No audio input devices detected") recommendations.append("🔧 Check microphone connection and drivers") else: print(f"✅ Found {len(input_devices)} audio input device(s)") pa.terminate() except ImportError: issues.append("❌ PyAudio not installed") recommendations.append("🔧 Install PyAudio: pip install pyaudio") except Exception as e: issues.append(f"❌ PyAudio error: {e}") recommendations.append("🔧 Reinstall PyAudio or check audio drivers") # Check internet connectivity import socket try: socket.create_connection(("8.8.8.8", 53), timeout=3) print("✅ Internet connectivity OK") except OSError: issues.append("❌ No internet connectivity") recommendations.append("🔧 Check network connection") return { 'issues': issues, 'recommendations': recommendations } # Usage example async def main(): # Test network connectivity endpoints = [ 'https://api.openai.com', 'https://speech.googleapis.com', 'https://transcribe.us-east-1.amazonaws.com' ] print("🔍 Testing connectivity...") connectivity = await NetworkDiagnostics.test_connectivity(endpoints) for url, result in connectivity.items(): if result['status'] == 'success': print(f"✅ {url}: {result['latency_ms']}ms") else: print(f"❌ {url}: {result['error']}") # Diagnose audio issues print("\n🔍 Diagnosing audio setup...") audio_diagnosis = NetworkDiagnostics.diagnose_audio_issues() if audio_diagnosis['issues']: for issue in audio_diagnosis['issues']: print(issue) print("\n💡 Recommendations:") for rec in audio_diagnosis['recommendations']: print(rec) if __name__ == "__main__": asyncio.run(main())

Best Practices

Security & Privacy

  • Encrypt all audio data in transit (TLS 1.3) and at rest (AES-256)
  • Implement access controls — API key rotation, least-privilege IAM roles
  • GDPR compliance — delete audio after processing, anonymise transcripts
  • PII detection — use provider-built PII redaction (AWS, Azure) or custom filters

Scalability

  • Connection pooling — reuse WebSocket connections across requests
  • Load balancing — distribute across regions for lower latency
  • Queue management — buffer audio chunks during traffic spikes
  • Auto-scaling — scale workers based on active connection count

Cost Optimisation

  • Choose right models — use lightweight models for simple tasks, premium for accuracy
  • Optimise audio — downsample to 16 kHz, remove silence, compress where possible
  • Implement caching — cache common responses and TTS output
  • Monitor usage — set budget alerts, track cost per conversation

Testing Framework

Voice AI Testing Framework
import unittest import asyncio from unittest.mock import Mock, patch import tempfile import wave import numpy as np class VoiceAITestFramework: """Comprehensive testing framework for Voice AI systems""" @staticmethod def generate_test_audio(duration=3.0, sample_rate=16000, frequency=440): """Generate test audio file""" t = np.linspace(0, duration, int(sample_rate * duration)) audio_data = np.sin(2 * np.pi * frequency * t) * 0.5 # Convert to 16-bit PCM audio_16bit = (audio_data * 32767).astype(np.int16) # Save to temporary file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: with wave.open(temp_file.name, 'wb') as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(sample_rate) wav_file.writeframes(audio_16bit.tobytes()) return temp_file.name @staticmethod def create_test_suite(): """Create comprehensive test suite""" class TestVoiceAI(unittest.TestCase): def setUp(self): """Set up test fixtures""" self.test_audio_file = VoiceAITestFramework.generate_test_audio() # Initialize your VoiceAI instance here # self.voice_ai = YourVoiceAI() def tearDown(self): """Clean up test fixtures""" import os if os.path.exists(self.test_audio_file): os.unlink(self.test_audio_file) def test_audio_file_processing(self): """Test basic audio file processing""" # Test that audio file can be processed without errors pass # Implement your test logic def test_empty_audio_handling(self): """Test handling of empty or silent audio""" silent_audio = VoiceAITestFramework.generate_test_audio( duration=1.0, frequency=0 # Silent ) # Test processing of silent audio pass def test_noisy_audio_processing(self): """Test processing of noisy audio""" # Generate noisy test audio noisy_audio = self.generate_noisy_audio() # Test processing pass def test_multiple_languages(self): """Test multi-language support""" languages = ['en-US', 'es-ES', 'fr-FR', 'de-DE'] for lang in languages: with self.subTest(language=lang): # Test each language pass def test_concurrent_requests(self): """Test concurrent processing capability""" async def run_concurrent_test(): tasks = [] for i in range(5): # 5 concurrent requests # Create task for processing pass # Wait for all tasks to complete await asyncio.gather(*tasks) asyncio.run(run_concurrent_test()) def test_rate_limiting(self): """Test rate limiting functionality""" # Send requests faster than rate limit # Verify proper rate limiting behavior pass def test_error_handling(self): """Test error handling and recovery""" # Test various error conditions error_conditions = [ 'invalid_audio_format', 'network_timeout', 'api_quota_exceeded', 'invalid_credentials' ] for condition in error_conditions: with self.subTest(error_condition=condition): # Test each error condition pass def test_audio_quality_requirements(self): """Test audio quality validation""" quality_tests = [ {'sample_rate': 8000, 'expected': 'low_quality'}, {'sample_rate': 16000, 'expected': 'good_quality'}, {'sample_rate': 44100, 'expected': 'high_quality'}, ] for test_case in quality_tests: with self.subTest(test_case=test_case): # Generate audio with specific quality # Test processing results pass @patch('your_voice_ai_module.api_call') def test_api_mocking(self, mock_api): """Test with mocked API responses""" # Mock successful response mock_api.return_value = { 'transcript': 'Hello world', 'confidence': 0.95 } # Test processing with mocked response pass def generate_noisy_audio(self, snr_db=10): """Generate audio with controlled noise level""" clean_audio = VoiceAITestFramework.generate_test_audio() # Add noise (implementation depends on your needs) # Return path to noisy audio file return clean_audio return unittest.TestLoader().loadTestsFromTestCase(TestVoiceAI) # Performance benchmarking class VoiceAIBenchmark: def __init__(self, voice_ai_instance): self.voice_ai = voice_ai_instance self.results = [] async def run_latency_benchmark(self, num_tests=50): """Benchmark processing latency""" print(f"🏃‍♂️ Running latency benchmark ({num_tests} tests)...") latencies = [] for i in range(num_tests): test_audio = VoiceAITestFramework.generate_test_audio(duration=2.0) start_time = time.time() try: result = await self.voice_ai.transcribe_audio(test_audio) latency = time.time() - start_time latencies.append(latency) if (i + 1) % 10 == 0: print(f" Completed {i + 1}/{num_tests} tests...") except Exception as e: print(f" ❌ Test {i + 1} failed: {e}") finally: import os if os.path.exists(test_audio): os.unlink(test_audio) # Calculate statistics if latencies: stats = { 'mean_latency': np.mean(latencies), 'median_latency': np.median(latencies), 'p95_latency': np.percentile(latencies, 95), 'p99_latency': np.percentile(latencies, 99), 'min_latency': np.min(latencies), 'max_latency': np.max(latencies), 'success_rate': len(latencies) / num_tests } print("📊 Benchmark Results:") for metric, value in stats.items(): if 'latency' in metric: print(f" {metric}: {value:.3f}s") else: print(f" {metric}: {value:.3f}") return stats else: print("❌ All benchmark tests failed") return None # Usage example if __name__ == "__main__": # Run unit tests print("🧪 Running Voice AI unit tests...") test_suite = VoiceAITestFramework.create_test_suite() runner = unittest.TextTestRunner(verbosity=2) test_results = runner.run(test_suite) # Run benchmarks # benchmark = VoiceAIBenchmark(your_voice_ai_instance) # asyncio.run(benchmark.run_latency_benchmark()) if test_results.wasSuccessful(): print("✅ All tests passed!") else: print(f"❌ {len(test_results.failures)} test(s) failed") print(f"⚠️ {len(test_results.errors)} error(s) occurred")

Voice AI Setup — 6 FAQs Answered

What is the best voice AI platform to start with?

OpenAI Whisper for best accuracy ($0.006/min, 99+ languages). Google Cloud for real-time streaming. AWS Transcribe for enterprise. Azure for Microsoft ecosystem.

How much does voice AI cost?

OpenAI: $0.006/min. Google: $0.024/min. AWS: $0.024/min. Azure: $1.00/hr. All offer free tiers for development.

Can I build a voice AI assistant for free?

Yes. Web Speech API is free in Chrome/Firefox. For production, OpenAI gives $5 credit, Google and AWS offer 60 minutes/month free.

What programming languages can I use?

Python is most popular. JavaScript/Node.js for WebSocket apps. All platforms provide SDKs for Python, Node.js, Java, Go and .NET.

How do I handle real-time streaming?

Use WebSocket connections with 20–100 ms audio chunks. Google and AWS both support streaming via gRPC/WebSockets. Use PyAudio or Web Audio API for mic input.

What audio format should I use?

16-bit PCM at 16 kHz is standard. For telephony, 8 kHz μ-law (G.711) is standard. WAV for file processing, MP3/OGG for TTS output.

Want voice AI without building it yourself? The Team-Connect AI Receptionist handles business calls 24/7 using production voice AI. Try it free.