Python语音转文字(STT)库实战指南
我将为您提供一个全面的Python语音转文字库实战指南,涵盖主流库的选择、安装、使用和性能对比。
📋 目录
主流STT库概览
本地模型 vs 云端API
Whisper实战教程
SpeechRecognition实战教程
Vosk实战教程
性能对比与选择建议
实战项目示例
1. 主流STT库概览
| 库名称 |
类型 |
支持语言 |
特点 |
|---|
| Whisper (OpenAI) |
本地 |
99种语言 |
高精度,支持多语言,开源 |
| SpeechRecognition |
云端API包装器 |
多种 |
支持多个API(Google, Bing, IBM等) |
| Vosk |
本地 |
20+语言 |
离线可用,轻量级,实时识别 |
| DeepSpeech (Mozilla) |
本地 |
英语为主 |
开源,隐私友好,需要训练 |
| AssemblyAI API |
云端 |
多种 |
专业级API,高级功能 |
| Rev.AI |
云端 |
多种 |
商业级精度,支持批量处理 |
2. 本地模型 vs 云端API
本地模型优势
云端API优势
3. Whisper实战教程
3.1 安装与基础使用
# 安装
# pip install openai-whisper
# 需要安装ffmpeg: brew install ffmpeg (mac) 或 sudo apt install ffmpeg (linux)
import whisper
# 加载模型(有 tiny, base, small, medium, large 五种尺寸)
model = whisper.load_model("base") # 平衡精度与速度
# 转录音频文件
result = model.transcribe("audio.mp3")
print(result["text"])
# 带参数的转录
result = model.transcribe(
"audio.wav",
language="zh", # 指定语言(中文)
task="transcribe", # 或 "translate" 翻译成英文
fp16=False, # 如果无GPU设为False
verbose=True # 显示进度
)
3.2 高级功能
import whisper
import numpy as np
def transcribe_with_timestamps(audio_path):
"""带时间戳的转录"""
model = whisper.load_model("small")
result = model.transcribe(audio_path, word_timestamps=True)
# 输出带时间戳的文本
for segment in result["segments"]:
print(f"[{segment['start']:.2f}s - {segment['end']:.2f}s]: {segment['text']}")
return result
def real_time_transcription(audio_stream):
"""实时转录(需配合音频流)"""
import pyaudio
import wave
# 录制音频
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print("开始录音...")
frames = []
for _ in range(0, int(RATE / CHUNK * 5)): # 录制5秒
data = stream.read(CHUNK)
frames.append(data)
stream.stop_stream()
stream.close()
p.terminate()
# 保存并转录
with wave.open("temp.wav", 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
model = whisper.load_model("base")
result = model.transcribe("temp.wav")
return result["text"]
4. SpeechRecognition实战教程
4.1 安装与多引擎配置
# 安装
# pip install SpeechRecognition
# 根据需求安装pyaudio: pip install pyaudio
import speech_recognition as sr
def recognize_google_cloud(audio_file):
"""使用Google Cloud Speech API"""
r = sr.Recognizer()
with sr.AudioFile(audio_file) as source:
audio = r.record(source)
try:
# 需要配置GOOGLE_APPLICATION_CREDENTIALS环境变量
text = r.recognize_google_cloud(
audio,
credentials_json=None, # 或提供JSON字符串
language="zh-CN"
)
return text
except sr.UnknownValueError:
return "无法识别音频"
except sr.RequestError as e:
return f"API请求错误: {e}"
def recognize_multiple_apis(audio_file):
"""多API后备方案"""
r = sr.Recognizer()
with sr.AudioFile(audio_file) as source:
audio = r.record(source)
apis = [
("Google", r.recognize_google, {"language": "zh-CN"}),
("Bing", r.recognize_bing, {"language": "zh-CN", "key": "YOUR_BING_KEY"}),
("Sphinx", r.recognize_sphinx, {}), # 离线,仅英语
]
for api_name, recognizer_func, kwargs in apis:
try:
text = recognizer_func(audio, **kwargs)
print(f"{api_name}: {text}")
return text
except:
print(f"{api_name} 识别失败,尝试下一个...")
continue
return "所有API都识别失败"
# 实时麦克风输入
def microphone_to_text():
"""从麦克风实时转文字"""
r = sr.Recognizer()
with sr.Microphone() as source:
print("校准环境噪音...")
r.adjust_for_ambient_noise(source, duration=1)
print("请说话...")
try:
audio = r.listen(source, timeout=5, phrase_time_limit=10)
text = r.recognize_google(audio, language="zh-CN")
print(f"识别结果: {text}")
return text
except sr.WaitTimeoutError:
print("超时,未检测到语音")
except sr.UnknownValueError:
print("无法理解音频")
except sr.RequestError as e:
print(f"API错误: {e}")
5. Vosk实战教程
5.1 离线语音识别
# 安装
# pip install vosk
# 下载模型: https://alphacephei.com/vosk/models
import json
import os
from vosk import Model, KaldiRecognizer
import wave
def vosk_offline_recognition(audio_path, model_path="model"):
"""Vosk离线识别"""
# 如果模型不存在则下载
if not os.path.exists(model_path):
print(f"请从 https://alphacephei.com/vosk/models 下载模型并解压到 {model_path}")
return
# 加载模型
model = Model(model_path)
# 读取音频文件
wf = wave.open(audio_path, "rb")
# 检查音频格式
if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getframerate() not in [8000, 16000]:
print("音频格式必须为 WAV mono PCM, 16000 Hz")
return
# 创建识别器
rec = KaldiRecognizer(model, wf.getframerate())
rec.SetWords(True) # 获取单词时间戳
results = []
while True:
data = wf.readframes(4000)
if len(data) == 0:
break
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
if 'text' in result and result['text']:
results.append(result['text'])
# 获取最终结果
final_result = json.loads(rec.FinalResult())
results.append(final_result['text'])
wf.close()
return " ".join(results)
def vosk_real_time():
"""Vosk实时识别"""
import pyaudio
model_path = "vosk-model-small-cn-0.22" # 中文小模型
if not os.path.exists(model_path):
print("请先下载中文模型")
return
model = Model(model_path)
recognizer = KaldiRecognizer(model, 16000.0)
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=8000
)
stream.start_stream()
print("开始实时识别,按Ctrl+C停止...")
try:
while True:
data = stream.read(4000, exception_on_overflow=False)
if recognizer.AcceptWaveform(data):
result = json.loads(recognizer.Result())
if result['text']:
print(f"识别结果: {result['text']}")
else:
partial = json.loads(recognizer.PartialResult())
if partial['partial']:
print(f"实时: {partial['partial']}", end='\r')
except KeyboardInterrupt:
print("\n停止识别")
finally:
stream.stop_stream()
stream.close()
p.terminate()
6. 性能对比与选择建议
6.1 性能对比表
| 指标 |
Whisper |
SpeechRecognition |
Vosk |
DeepSpeech |
|---|
| 中文识别精度 |
⭐⭐⭐⭐⭐ |
⭐⭐⭐⭐ |
⭐⭐⭐ |
⭐⭐ |
| 多语言支持 |
⭐⭐⭐⭐⭐ |
⭐⭐⭐⭐ |
⭐⭐⭐ |
⭐ |
| 离线可用性 |
✅ |
❌ |
✅ |
✅ |
| 实时识别 |
⭐⭐ |
⭐⭐⭐⭐ |
⭐⭐⭐⭐⭐ |
⭐⭐⭐ |
| 内存占用 |
高 |
低 |
中 |
高 |
| 安装简便性 |
⭐⭐⭐ |
⭐⭐⭐⭐⭐ |
⭐⭐⭐⭐ |
⭐⭐ |
| 社区活跃度 |
⭐⭐⭐⭐⭐ |
⭐⭐⭐⭐ |
⭐⭐⭐ |
⭐⭐ |
6.2 选择建议
选择 Whisper 当:
- 需要最高识别精度
- 处理多语言内容
- 不要求实时性
- 有GPU可用
选择 SpeechRecognition 当:
- 需要连接多个云API
- 希望快速原型开发
- 不需要离线功能
- 有API密钥预算
选择 Vosk 当:
- 需要完全离线方案
- 实时语音识别
- 资源受限环境
- 对隐私要求极高
7. 实战项目示例
7.1 智能会议记录系统
import whisper
import json
from datetime import datetime
from pathlib import Path
class MeetingTranscriber:
def __init__(self, model_size="small", output_dir="transcripts"):
self.model = whisper.load_model(model_size)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def transcribe_meeting(self, audio_path, participants=None):
"""转录会议录音"""
print(f"开始处理: {audio_path}")
# 执行转录
result = self.model.transcribe(
str(audio_path),
language="zh",
word_timestamps=True,
verbose=True
)
# 生成结构化输出
transcript = {
"metadata": {
"file": audio_path,
"timestamp": datetime.now().isoformat(),
"participants": participants or [],
"duration": result.get("duration", 0),
"language": result.get("language", "unknown")
},
"segments": result.get("segments", []),
"full_text": result.get("text", "")
}
# 保存结果
output_file = self.output_dir / f"transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(transcript, f, ensure_ascii=False, indent=2)
# 生成易读的文本文件
text_file = output_file.with_suffix('.txt')
with open(text_file, 'w', encoding='utf-8') as f:
f.write("=== 会议转录 ===\n\n")
f.write(f"时间: {transcript['metadata']['timestamp']}\n")
f.write(f"时长: {transcript['metadata']['duration']:.2f}秒\n\n")
f.write("=== 内容 ===\n\n")
for segment in transcript['segments']:
start_time = self.format_time(segment['start'])
f.write(f"[{start_time}] {segment['text']}\n")
print(f"转录完成!保存至: {output_file}")
return transcript
def format_time(self, seconds):
"""格式化时间戳"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def batch_process(self, audio_folder, pattern="*.mp3"):
"""批量处理音频文件"""
folder = Path(audio_folder)
results = []
for audio_file in folder.glob(pattern):
try:
result = self.transcribe_meeting(audio_file)
results.append({
"file": str(audio_file),
"success": True,
"output": result['metadata']['timestamp']
})
except Exception as e:
results.append({
"file": str(audio_file),
"success": False,
"error": str(e)
})
return results
# 使用示例
if __name__ == "__main__":
transcriber = MeetingTranscriber(model_size="base")
# 转录单个文件
result = transcriber.transcribe_meeting(
"meeting_recording.mp3",
participants=["张三", "李四", "王五"]
)
# 批量处理
# results = transcriber.batch_process("recordings/")
7.2 实时字幕生成器
import whisper
import pyaudio
import numpy as np
import threading
import queue
import time
class LiveSubtitleGenerator:
def __init__(self, model_size="tiny", language="zh"):
self.model = whisper.load_model(model_size)
self.language = language
self.audio_queue = queue.Queue()
self.running = False
self.subtitle_callback = None
def start(self, subtitle_callback=None):
"""开始实时字幕生成"""
self.subtitle_callback = subtitle_callback
self.running = True
# 启动音频捕获线程
capture_thread = threading.Thread(target=self._capture_audio)
capture_thread.daemon = True
capture_thread.start()
# 启动处理线程
process_thread = threading.Thread(target=self._process_audio)
process_thread.daemon = True
process_thread.start()
print("实时字幕生成器已启动...")
def stop(self):
"""停止字幕生成"""
self.running = False
def _capture_audio(self):
"""捕获音频数据"""
CHUNK = 1024 * 2
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
while self.running:
try:
data = stream.read(CHUNK, exception_on_overflow=False)
audio_array = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
self.audio_queue.put(audio_array)
except Exception as e:
print(f"音频捕获错误: {e}")
stream.stop_stream()
stream.close()
p.terminate()
def _process_audio(self):
"""处理音频生成字幕"""
audio_buffer = []
buffer_duration = 3 # 每次处理3秒音频
while self.running:
try:
# 收集足够时长的音频
while len(audio_buffer) < buffer_duration * 16: # 16000Hz / 1000 * buffer_duration
if not self.audio_queue.empty():
audio_buffer.extend(self.audio_queue.get())
else:
time.sleep(0.01)
# 处理音频
audio_segment = np.array(audio_buffer[:buffer_duration * 16 * 1000 // 62.5])
audio_buffer = audio_buffer[buffer_duration * 16 * 1000 // 62.5:]
# 转录
result = self.model.transcribe(
audio_segment,
language=self.language,
fp16=False
)
text = result.get("text", "").strip()
if text and self.subtitle_callback:
self.subtitle_callback(text)
except Exception as e:
print(f"处理错误: {e}")
time.sleep(0.1)
# 使用示例
def print_subtitle(text):
"""字幕回调函数"""
print(f"字幕: {text}")
if __name__ == "__main__":
generator = LiveSubtitleGenerator(model_size="base", language="zh")
try:
generator.start(subtitle_callback=print_subtitle)
# 运行一段时间
time.sleep(60) # 运行60秒
except KeyboardInterrupt:
print("停止字幕生成")
finally:
generator.stop()
7.3 多引擎聚合服务
from typing import List, Dict, Optional
import asyncio
import aiohttp
import concurrent.futures
import whisper
import speech_recognition as sr
class STTAggregator:
"""多引擎聚合服务,提供最佳结果"""
def __init__(self, config: Dict):
self.config = config
self.local_models = {}
# 初始化本地模型
if config.get('use_whisper', False):
self.local_models['whisper'] = whisper.load_model(
config.get('whisper_model', 'base')
)
async def transcribe_audio(self, audio_path: str,
use_apis: List[str] = None) -> Dict:
"""使用多个引擎转录音频"""
if use_apis is None:
use_apis = ['whisper', 'google', 'azure']
tasks = []
# 添加Whisper任务
if 'whisper' in use_apis and 'whisper' in self.local_models:
tasks.append(self._whisper_transcribe(audio_path))
# 添加云端API任务
if 'google' in use_apis and self.config.get('google_api_key'):
tasks.append(self._google_transcribe(audio_path))
if 'azure' in use_apis and self.config.get('azure_key'):
tasks.append(self._azure_transcribe(audio_path))
# 并行执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
valid_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"引擎 {use_apis[i]} 失败: {result}")
elif result and result.get('text'):
valid_results.append(result)
# 选择最佳结果
best_result = self._select_best_result(valid_results)
return {
'best_result': best_result,
'all_results': valid_results,
'confidence': self._calculate_confidence(valid_results)
}
async def _whisper_transcribe(self, audio_path: str) -> Dict:
"""Whisper转录"""
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
self.local_models['whisper'].transcribe,
audio_path
)
return {
'engine': 'whisper',
'text': result.get('text', ''),
'language': result.get('language', ''),
'segments': result.get('segments', [])
}
async def _google_transcribe(self, audio_path: str) -> Dict:
"""Google Cloud Speech-to-Text"""
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio = recognizer.record(source)
try:
text = recognizer.recognize_google(
audio,
key=self.config.get('google_api_key'),
language=self.config.get('language', 'zh-CN')
)
return {'engine': 'google', 'text': text}
except Exception as e:
raise Exception(f"Google识别失败: {e}")
async def _azure_transcribe(self, audio_path: str) -> Dict:
"""Azure Cognitive Services"""
# 实现Azure转录逻辑
pass
def _select_best_result(self, results: List[Dict]) -> Dict:
"""选择最佳结果(可根据置信度、长度等策略)"""
if not results:
return {'text': '', 'engine': 'none'}
# 简单策略:选择最长的文本
best_result = max(results, key=lambda x: len(x.get('text', '')))
return best_result
def _calculate_confidence(self, results: List[Dict]) -> float:
"""计算结果置信度"""
if not results:
return 0.0
# 简单策略:基于结果一致性计算置信度
texts = [r.get('text', '') for r in results]
if len(texts) == 1:
return 0.7 # 单个结果中等置信度
# 计算相似度
similarities = []
for i in range(len(texts)):
for j in range(i + 1, len(texts)):
sim = self._text_similarity(texts[i], texts[j])
similarities.append(sim)
if similarities:
return sum(similarities) / len(similarities)
return 0.0
def _text_similarity(self, text1: str, text2: str) -> float:
"""计算文本相似度(简化版)"""
words1 = set(text1.split())
words2 = set(text2.split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
# 使用示例
async def main():
config = {
'use_whisper': True,
'whisper_model': 'base',
'google_api_key': 'your_key_here',
'language': 'zh-CN'
}
aggregator = STTAggregator(config)
result = await aggregator.transcribe_audio(
"meeting.wav",
use_apis=['whisper', 'google']
)
print(f"最佳结果: {result['best_result']['text']}")
print(f"置信度: {result['confidence']:.2%}")
print(f"所有结果: {[r['engine'] for r in result['all_results']]}")
if __name__ == "__main__":
asyncio.run(main())
🎯 总结与建议
快速开始方案
初学者/快速原型: 使用 SpeechRecognition + Google Web Speech API(免费,无需密钥)
中文应用/高精度需求: 使用 Whisper(本地,精度高)
离线/实时应用: 使用 Vosk(轻量级,实时性好)
生产环境/商业应用: 考虑专业API(AssemblyAI, Rev.AI)或 Whisper API
性能优化技巧
音频预处理: 确保音频为单声道、16kHz采样率
模型选择: 根据需求平衡精度与速度(Whisper: tiny最快,large最准)
批量处理: 长时间音频可分段处理
GPU加速: 如有NVIDIA GPU,确保安装CUDA版本
注意事项
使用云API时注意数据隐私和合规性
离线模型需要足够的磁盘空间存储模型文件
实时应用要考虑延迟和流式处理
中文识别注意方言和口音问题
这个指南涵盖了从基础使用到高级应用的完整流程,您可以根据具体需求选择合适的方案。