import numpy as np
import librosa
import matplotlib.pyplot as plt
from scipy import signal
class AudioQualityAnalyzer:
def __init__(self):
self.sample_rate = 16000
def analyze_audio_file(self, audio_file_path):
"""Comprehensive audio quality analysis"""
# Load audio
audio_data, sr = librosa.load(audio_file_path, sr=self.sample_rate)
analysis = {
'file_path': audio_file_path,
'duration': len(audio_data) / sr,
'sample_rate': sr,
'channels': 1, # librosa loads as mono by default
'bit_depth': '32-bit float (loaded)',
'file_size_mb': len(audio_data) * 4 / (1024 * 1024) # 4 bytes per float32
}
# Signal quality metrics
analysis.update(self._analyze_signal_quality(audio_data, sr))
# Frequency analysis
analysis.update(self._analyze_frequency_content(audio_data, sr))
# Voice activity detection
analysis.update(self._detect_voice_activity(audio_data, sr))
# Recommendations
analysis['recommendations'] = self._generate_recommendations(analysis)
return analysis
def _analyze_signal_quality(self, audio_data, sr):
"""Analyze basic signal quality metrics"""
# RMS (Root Mean Square) - overall loudness
rms = np.sqrt(np.mean(audio_data**2))
# Peak amplitude
peak = np.max(np.abs(audio_data))
# Dynamic range
dynamic_range = 20 * np.log10(peak / (rms + 1e-10))
# Signal-to-Noise Ratio estimation
# Find quiet segments (bottom 10% of RMS values)
frame_length = int(0.025 * sr) # 25ms frames
hop_length = int(0.010 * sr) # 10ms hop
frames = librosa.frame(audio_data, frame_length=frame_length, hop_length=hop_length)
frame_rms = np.sqrt(np.mean(frames**2, axis=0))
noise_threshold = np.percentile(frame_rms, 10)
signal_power = np.mean(frame_rms[frame_rms > noise_threshold]**2)
noise_power = np.mean(frame_rms[frame_rms <= noise_threshold]**2)
snr = 10 * np.log10(signal_power / (noise_power + 1e-10))
# Clipping detection
clipping_threshold = 0.95
clipped_samples = np.sum(np.abs(audio_data) > clipping_threshold)
clipping_percentage = (clipped_samples / len(audio_data)) * 100
return {
'rms_level': float(rms),
'peak_amplitude': float(peak),
'dynamic_range_db': float(dynamic_range),
'estimated_snr_db': float(snr),
'clipping_percentage': float(clipping_percentage),
'is_clipped': clipping_percentage > 0.1
}
def _analyze_frequency_content(self, audio_data, sr):
"""Analyze frequency content for speech optimization"""
# Compute power spectral density
frequencies, psd = signal.welch(audio_data, sr, nperseg=1024)
# Key frequency ranges for speech
ranges = {
'fundamental_freq': (80, 300), # Fundamental frequency range
'formant_range': (300, 3400), # Main formant range
'consonant_range': (2000, 8000), # Consonant clarity range
'full_bandwidth': (20, 8000) # Full audio bandwidth
}
frequency_analysis = {}
for range_name, (low_freq, high_freq) in ranges.items():
# Find frequency indices
low_idx = np.argmin(np.abs(frequencies - low_freq))
high_idx = np.argmin(np.abs(frequencies - high_freq))
# Calculate average power in range
avg_power = np.mean(psd[low_idx:high_idx])
frequency_analysis[f'{range_name}_power'] = float(avg_power)
# Spectral centroid (brightness)
spectral_centroid = np.sum(frequencies * psd) / np.sum(psd)
frequency_analysis['spectral_centroid_hz'] = float(spectral_centroid)
# Bandwidth (spectral spread)
spectral_spread = np.sqrt(np.sum(((frequencies - spectral_centroid) ** 2) * psd) / np.sum(psd))
frequency_analysis['spectral_spread_hz'] = float(spectral_spread)
return frequency_analysis
def _detect_voice_activity(self, audio_data, sr):
"""Detect voice activity and speech characteristics"""
# Frame the audio
frame_length = int(0.025 * sr) # 25ms
hop_length = int(0.010 * sr) # 10ms
# Energy-based VAD
frames = librosa.frame(audio_data, frame_length=frame_length, hop_length=hop_length)
frame_energy = np.sum(frames**2, axis=0)
# Adaptive threshold
energy_threshold = np.percentile(frame_energy, 30) # Bottom 30% as silence
voice_frames = frame_energy > energy_threshold
# Speech statistics
total_frames = len(voice_frames)
speech_frames = np.sum(voice_frames)
speech_ratio = speech_frames / total_frames if total_frames > 0 else 0
# Find speech segments
speech_segments = []
in_speech = False
segment_start = 0
for i, is_voice in enumerate(voice_frames):
if is_voice and not in_speech:
# Start of speech segment
segment_start = i * hop_length / sr
in_speech = True
elif not is_voice and in_speech:
# End of speech segment
segment_end = i * hop_length / sr
speech_segments.append((segment_start, segment_end))
in_speech = False
return {
'speech_ratio': float(speech_ratio),
'total_speech_segments': len(speech_segments),
'average_segment_length': float(np.mean([end - start for start, end in speech_segments])) if speech_segments else 0,
'silence_ratio': float(1 - speech_ratio),
'voice_activity_detected': speech_ratio > 0.1
}
def _generate_recommendations(self, analysis):
"""Generate optimization recommendations based on analysis"""
recommendations = []
# Audio level recommendations
if analysis['rms_level'] < 0.01:
recommendations.append("⚠️ Audio level too low - increase microphone gain or speak louder")
elif analysis['rms_level'] > 0.5:
recommendations.append("⚠️ Audio level too high - reduce microphone gain to prevent clipping")
# Clipping check
if analysis['is_clipped']:
recommendations.append("❌ Audio clipping detected - reduce input gain immediately")
# SNR recommendations
if analysis['estimated_snr_db'] < 10:
recommendations.append("🔇 Low signal-to-noise ratio - use noise cancellation or quieter environment")
elif analysis['estimated_snr_db'] > 25:
recommendations.append("✅ Excellent signal-to-noise ratio")
# Speech content
if not analysis['voice_activity_detected']:
recommendations.append("❌ No speech detected - check microphone and speak clearly")
elif analysis['speech_ratio'] < 0.3:
recommendations.append("⚠️ Low speech content - too much silence in recording")
# Frequency content
if analysis['formant_range_power'] < analysis['fundamental_freq_power'] * 0.1:
recommendations.append("⚠️ Weak formant frequencies - may affect speech recognition")
# File quality
if analysis['duration'] < 1.0:
recommendations.append("⚠️ Very short audio clip - longer samples improve accuracy")
elif analysis['duration'] > 30.0:
recommendations.append("💡 Long audio clip - consider chunking for better real-time performance")
if not recommendations:
recommendations.append("✅ Audio quality looks good for speech recognition")
return recommendations
def plot_analysis(self, analysis, audio_file_path):
"""Create visualization plots for audio analysis"""
# Load audio for plotting
audio_data, sr = librosa.load(audio_file_path, sr=self.sample_rate)
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f'Audio Analysis: {audio_file_path}', fontsize=16)
# Time domain plot
time = np.linspace(0, len(audio_data) / sr, len(audio_data))
axes[0, 0].plot(time, audio_data)
axes[0, 0].set_title('Waveform')
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].grid(True)
# Frequency spectrum
frequencies, psd = signal.welch(audio_data, sr, nperseg=1024)
axes[0, 1].semilogx(frequencies, 10 * np.log10(psd))
axes[0, 1].set_title('Power Spectral Density')
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Power (dB)')
axes[0, 1].grid(True)
# Spectrogram
f, t, Sxx = signal.spectrogram(audio_data, sr)
axes[1, 0].pcolormesh(t, f, 10 * np.log10(Sxx + 1e-10))
axes[1, 0].set_title('Spectrogram')
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Frequency (Hz)')
# Quality metrics bar chart
metrics = {
'RMS Level': analysis['rms_level'],
'Peak Amp': analysis['peak_amplitude'],
'SNR (dB)': analysis['estimated_snr_db'] / 30, # Normalize for display
'Speech Ratio': analysis['speech_ratio']
}
bars = axes[1, 1].bar(metrics.keys(), metrics.values())
axes[1, 1].set_title('Quality Metrics')
axes[1, 1].set_ylabel('Normalized Value')
axes[1, 1].tick_params(axis='x', rotation=45)
# Color code bars
colors = ['red' if v < 0.3 else 'orange' if v < 0.7 else 'green' for v in metrics.values()]
for bar, color in zip(bars, colors):
bar.set_color(color)
plt.tight_layout()
plt.show()
return fig
# Usage example
if __name__ == "__main__":
analyzer = AudioQualityAnalyzer()
# Analyze audio file
analysis = analyzer.analyze_audio_file("test_audio.wav")
# Print analysis results
print("=== AUDIO QUALITY ANALYSIS ===")
print(f"Duration: {analysis['duration']:.2f}s")
print(f"RMS Level: {analysis['rms_level']:.4f}")
print(f"Peak Amplitude: {analysis['peak_amplitude']:.4f}")
print(f"SNR: {analysis['estimated_snr_db']:.1f} dB")
print(f"Speech Ratio: {analysis['speech_ratio']:.1%}")
print(f"Clipping: {'Yes' if analysis['is_clipped'] else 'No'}")
print("\n=== RECOMMENDATIONS ===")
for rec in analysis['recommendations']:
print(rec)
# Create plots
analyzer.plot_analysis(analysis, "test_audio.wav")
import asyncio
import time
import random
from typing import Optional, Callable, Any
class RateLimiter:
def __init__(self, max_requests_per_minute: int = 60):
self.max_requests = max_requests_per_minute
self.requests = []
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
# Remove requests older than 1 minute
self.requests = [req_time for req_time in self.requests if now - req_time < 60]
if len(self.requests) >= self.max_requests:
# Wait until we can make another request
sleep_time = 60 - (now - self.requests[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
return await self.acquire()
self.requests.append(now)
class RetryHandler:
def __init__(self, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
async def execute_with_retry(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with exponential backoff retry logic"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == self.max_retries:
# Final attempt failed
break
# Calculate delay with exponential backoff and jitter
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
jitter = random.uniform(0, 0.1) * delay # Add up to 10% jitter
total_delay = delay + jitter
print(f"⚠️ Attempt {attempt + 1} failed: {e}")
print(f"🔄 Retrying in {total_delay:.1f} seconds...")
await asyncio.sleep(total_delay)
# All retries exhausted
raise last_exception
class RobustVoiceAI:
def __init__(self):
self.rate_limiter = RateLimiter(max_requests_per_minute=50)
self.retry_handler = RetryHandler(max_retries=3)
self.providers = [] # List of voice AI providers
self.current_provider_index = 0
async def transcribe_with_resilience(self, audio_data: bytes) -> Optional[str]:
"""Transcribe with rate limiting, retries, and provider failover"""
# Apply rate limiting
await self.rate_limiter.acquire()
# Try each provider
for provider_attempt in range(len(self.providers)):
current_provider = self.providers[self.current_provider_index]
try:
print(f"🔄 Using provider: {current_provider.name}")
# Execute with retry logic
result = await self.retry_handler.execute_with_retry(
self._transcribe_with_provider,
current_provider,
audio_data
)
return result
except Exception as e:
print(f"❌ Provider {current_provider.name} failed: {e}")
# Switch to next provider
self.current_provider_index = (self.current_provider_index + 1) % len(self.providers)
# If this was the last provider, give up
if provider_attempt == len(self.providers) - 1:
print("❌ All providers failed")
raise e
return None
async def _transcribe_with_provider(self, provider, audio_data: bytes) -> str:
"""Internal method to transcribe with specific provider"""
# This would call the actual provider's API
# Implementation depends on the provider (OpenAI, Google, etc.)
pass
# Network troubleshooting utilities
class NetworkDiagnostics:
@staticmethod
async def test_connectivity(urls: list) -> dict:
"""Test connectivity to voice AI service endpoints"""
import aiohttp
results = {}
async with aiohttp.ClientSession() as session:
for url in urls:
try:
start_time = time.time()
async with session.get(url, timeout=10) as response:
latency = (time.time() - start_time) * 1000
results[url] = {
'status': 'success',
'status_code': response.status,
'latency_ms': round(latency, 2)
}
except Exception as e:
results[url] = {
'status': 'failed',
'error': str(e)
}
return results
@staticmethod
def diagnose_audio_issues():
"""Diagnose common audio setup issues"""
issues = []
recommendations = []
# Check PyAudio installation
try:
import pyaudio
pa = pyaudio.PyAudio()
# Check for input devices
input_devices = []
for i in range(pa.get_device_count()):
device_info = pa.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
input_devices.append(device_info)
if not input_devices:
issues.append("❌ No audio input devices detected")
recommendations.append("🔧 Check microphone connection and drivers")
else:
print(f"✅ Found {len(input_devices)} audio input device(s)")
pa.terminate()
except ImportError:
issues.append("❌ PyAudio not installed")
recommendations.append("🔧 Install PyAudio: pip install pyaudio")
except Exception as e:
issues.append(f"❌ PyAudio error: {e}")
recommendations.append("🔧 Reinstall PyAudio or check audio drivers")
# Check internet connectivity
import socket
try:
socket.create_connection(("8.8.8.8", 53), timeout=3)
print("✅ Internet connectivity OK")
except OSError:
issues.append("❌ No internet connectivity")
recommendations.append("🔧 Check network connection")
return {
'issues': issues,
'recommendations': recommendations
}
# Usage example
async def main():
# Test network connectivity
endpoints = [
'https://api.openai.com',
'https://speech.googleapis.com',
'https://transcribe.us-east-1.amazonaws.com'
]
print("🔍 Testing connectivity...")
connectivity = await NetworkDiagnostics.test_connectivity(endpoints)
for url, result in connectivity.items():
if result['status'] == 'success':
print(f"✅ {url}: {result['latency_ms']}ms")
else:
print(f"❌ {url}: {result['error']}")
# Diagnose audio issues
print("\n🔍 Diagnosing audio setup...")
audio_diagnosis = NetworkDiagnostics.diagnose_audio_issues()
if audio_diagnosis['issues']:
for issue in audio_diagnosis['issues']:
print(issue)
print("\n💡 Recommendations:")
for rec in audio_diagnosis['recommendations']:
print(rec)
if __name__ == "__main__":
asyncio.run(main())