- Python
- Python (Async)
- TypeScript
- TypeScript (Browser)
def stt_auto_finalize_websocket(client: Cartesia, *args: str) -> None:
"""Realtime STT with native turn detection (recommended for voice agents).
The model signals when a user turn starts and ends, so your agent reacts
to events rather than running its own VAD.
Pass a mono uncompressed PCM WAV file (16-bit or 32-bit) as an argument,
or call with no args to synthesize sample audio via TTS.
"""
import sys
import time
import wave
from cartesia.types import STTEncoding, RawOutputFormatParam
encoding: STTEncoding
chunks: list[bytes]
if args:
with wave.open(args[0], "rb") as wf:
if wf.getnchannels() != 1:
print(f"Error: WAV must be mono, got {wf.getnchannels()} channels.")
sys.exit(1)
if wf.getcomptype() != "NONE":
print(f"Error: WAV must be uncompressed PCM, got {wf.getcomptype()!r}.")
sys.exit(1)
sample_width = wf.getsampwidth()
if sample_width == 2:
encoding = "pcm_s16le"
elif sample_width == 4:
encoding = "pcm_s32le"
else:
print(f"Error: unsupported sample width {sample_width} bytes (expected 2 or 4).")
sys.exit(1)
sample_rate = wf.getframerate()
chunks = []
while True:
data = wf.readframes(sample_rate // 10) # 100ms per chunk
if not data:
break
chunks.append(data)
else:
output_format: RawOutputFormatParam = {"container": "raw", "encoding": "pcm_s16le", "sample_rate": 16000}
encoding = output_format["encoding"]
sample_rate = output_format["sample_rate"]
generation_transcript = "Hello, world! The quick brown fox jumps over the lazy dog."
print(f"No WAV file provided — synthesizing audio with TTS: {generation_transcript!r}")
audio = client.tts.generate(
model_id="sonic-latest",
transcript=generation_transcript,
voice={"mode": "id", "id": "6ccbfb76-1fc6-48f7-b71d-91ac6298247b"},
output_format={"container": "raw", "encoding": encoding, "sample_rate": sample_rate},
language="en",
).read()
chunk_bytes = (sample_rate * 2) // 10 # 100ms of pcm_s16le (2 bytes/sample)
chunks = [audio[i : i + chunk_bytes] for i in range(0, len(audio), chunk_bytes)]
# Concatenate transcripts from all turn.end events to get the full transcript
# Do not strip or add whitespace!
full_transcript = ""
with client.stt.auto_finalize.websocket(
encoding=encoding,
model="ink-2",
sample_rate=sample_rate,
) as connection:
for chunk in chunks:
connection.send_raw(chunk)
time.sleep(0.1) # each chunk is 100ms of audio — pace sends to match real time
# Flush remaining audio and close the session cleanly.
connection.send({"type": "close"})
for event in connection:
if event.type == "connected":
print(f"connected | request_id={event.request_id}")
elif event.type == "turn.start":
print("turn.start |")
elif event.type == "turn.update":
# event.transcript is cumulative within a turn.
print(f"turn.update | {event.transcript}")
elif event.type == "turn.eager_end":
print(f"turn.eager_end | {event.transcript}")
elif event.type == "turn.resume":
print("turn.resume |")
elif event.type == "turn.end":
print(f"turn.end | {event.transcript}")
full_transcript += event.transcript
elif event.type == "error":
print(f"error | {event.message}")
print(f"\nFull transcript: {full_transcript!r}")
The async version sends audio and receives events concurrently via From cartesia-python/examples/async_examples.py:502
asyncio.gather — the realistic pattern for real-time agents.async def stt_auto_finalize_websocket_async(client: AsyncCartesia, *args: str) -> None:
"""Async realtime STT with native turn detection (recommended for voice agents).
The model signals when a user turn starts and ends, so your agent reacts
to events rather than running its own VAD.
Streams audio and receives events concurrently using ``asyncio.gather`` —
the realistic pattern for real-time agents.
Pass a mono uncompressed PCM WAV file (16-bit or 32-bit) as an argument,
or call with no args to synthesize sample audio via TTS.
"""
import sys
import wave
import asyncio
from cartesia.types import STTEncoding, RawOutputFormatParam
encoding: STTEncoding
chunks: list[bytes]
if args:
with wave.open(args[0], "rb") as wf:
if wf.getnchannels() != 1:
print(f"Error: WAV must be mono, got {wf.getnchannels()} channels.")
sys.exit(1)
if wf.getcomptype() != "NONE":
print(f"Error: WAV must be uncompressed PCM, got {wf.getcomptype()!r}.")
sys.exit(1)
sample_width = wf.getsampwidth()
if sample_width == 2:
encoding = "pcm_s16le"
elif sample_width == 4:
encoding = "pcm_s32le"
else:
print(f"Error: unsupported sample width {sample_width} bytes (expected 2 or 4).")
sys.exit(1)
sample_rate = wf.getframerate()
chunks = []
while True:
data = wf.readframes(sample_rate // 10) # 100ms per chunk
if not data:
break
chunks.append(data)
else:
output_format: RawOutputFormatParam = {"container": "raw", "encoding": "pcm_s16le", "sample_rate": 16000}
encoding = output_format["encoding"]
sample_rate = output_format["sample_rate"]
transcript = "Hello, world! The quick brown fox jumps over the lazy dog."
print(f"No WAV file provided — synthesizing audio with TTS: {transcript!r}")
tts_response = await client.tts.generate(
model_id="sonic-latest",
transcript=transcript,
voice={"mode": "id", "id": "6ccbfb76-1fc6-48f7-b71d-91ac6298247b"},
output_format=output_format,
language="en",
)
audio = await tts_response.read()
chunk_bytes = (sample_rate * 2) // 10 # 100ms of pcm_s16le (2 bytes/sample)
chunks = [audio[i : i + chunk_bytes] for i in range(0, len(audio), chunk_bytes)]
# Concatenate transcripts from all turn.end events to get the full transcript
# Do not strip or add whitespace!
full_transcript = ""
async with client.stt.auto_finalize.websocket(
encoding=encoding,
model="ink-2",
sample_rate=sample_rate,
) as connection:
async def send_audio() -> None:
for chunk in chunks:
await connection.send_raw(chunk)
# Pace at real-time (100ms per chunk)
await asyncio.sleep(0.1)
# Flush remaining audio and close the session.
await connection.send({"type": "close"})
async def receive_events() -> None:
async for event in connection:
if event.type == "connected":
print(f"connected | request_id={event.request_id}")
elif event.type == "turn.start":
print("turn.start |")
elif event.type == "turn.update":
# event.transcript is cumulative within a turn.
print(f"turn.update | {event.transcript}")
elif event.type == "turn.eager_end":
print(f"turn.eager_end | {event.transcript}")
elif event.type == "turn.resume":
print("turn.resume |")
elif event.type == "turn.end":
print(f"turn.end | {event.transcript}")
nonlocal full_transcript
full_transcript += event.transcript
elif event.type == "error":
print(f"error | {event.message}")
await asyncio.gather(send_audio(), receive_events())
print(f"\nFull transcript: {full_transcript!r}")
async function sttAutoFinalizeWebsocket(client: Cartesia, args: string[]): Promise<void> {
const input = args.length > 0 ? args.join(' ') : 'The quick brown fox jumps over the lazy dog.';
const encoding = 'pcm_s16le';
const sampleRate = 16000;
console.log(`Generating audio for: ${JSON.stringify(input)}`);
const ws = client.stt.autoFinalize.websocket({
model: 'ink-2',
encoding,
sample_rate: sampleRate,
});
ws.on('error', (err) => console.error('WS error:', err.message));
const sender = (async () => {
const ttsResponse = await client.tts.generate({
model_id: 'sonic-latest',
transcript: input,
voice: { mode: 'id', id: '6ccbfb76-1fc6-48f7-b71d-91ac6298247b' },
output_format: { container: 'raw', encoding, sample_rate: sampleRate },
language: 'en',
});
if (!ttsResponse.body) throw new Error('TTS response had no body');
await sendRealtimeAudioChunks(
ttsResponse.body.getReader(),
(chunk) => ws.sendRaw(chunk),
sampleRate,
encoding,
);
// Tells the server to process any buffered audio, then close the socket.
ws.send({ type: 'close' });
})();
// Concatenate transcripts from all turn.end events to get the full transcript
// Do not strip or add whitespace!
let fullTranscript = '';
for await (const event of ws.stream()) {
if (event.type === 'message') {
const m = event.message;
switch (m.type) {
case 'connected':
console.log(`connected | request_id=${m.request_id}`);
break;
case 'turn.start':
console.log('turn.start |');
break;
case 'turn.update':
console.log(`turn.update | ${m.transcript}`);
break;
case 'turn.eager_end':
console.log(`turn.eager_end | ${m.transcript}`);
break;
case 'turn.resume':
console.log('turn.resume |');
break;
case 'turn.end':
console.log(`turn.end | ${m.transcript}`);
fullTranscript += m.transcript;
break;
}
} else if (event.type === 'error') {
console.error(`error | ${event.error.message}`);
}
}
await sender;
console.log(`Full transcript: ${JSON.stringify(fullTranscript)}`);
}
Captures microphone audio and prints turn events as the user speaks. Wire From cartesia-js/examples/browser_examples.ts:282
stop up to a button (or any UI control) to end the session cleanly; this example stops itself after 30 seconds.async function sttAutoFinalizeWebsocket(client: Cartesia): Promise<void> {
const audioCtx = new AudioContext();
// AudioWorklet that forwards mono Float32 frames to the main thread.
const workletSource = `
class PCMCapture extends AudioWorkletProcessor {
process(inputs) {
const ch = inputs[0]?.[0];
if (ch) this.port.postMessage(ch);
return true;
}
}
registerProcessor('pcm-capture', PCMCapture);
`;
const workletURL = URL.createObjectURL(new Blob([workletSource], { type: 'application/javascript' }));
await audioCtx.audioWorklet.addModule(workletURL);
URL.revokeObjectURL(workletURL);
const mediaStream = await navigator.mediaDevices.getUserMedia({ audio: true });
const source = audioCtx.createMediaStreamSource(mediaStream);
const capture = new AudioWorkletNode(audioCtx, 'pcm-capture');
const ws = client.stt.autoFinalize.websocket({
model: 'ink-2',
encoding: AUDIO_CONTEXT_ENCODING,
sample_rate: audioCtx.sampleRate,
});
ws.on('error', (err) => console.error(err.message));
const audioChunks = createFloat32AudioChunker(audioCtx.sampleRate, (chunk) => ws.sendRaw(chunk));
let stopped = false;
capture.port.onmessage = (e) => {
if (stopped) return;
const floats: Float32Array = e.data;
audioChunks.append(floats);
};
source.connect(capture);
// Sends a graceful close so the server finalizes buffered audio first.
const stop = () => {
if (stopped) return;
stopped = true;
audioChunks.flush();
ws.send({ type: 'close' });
};
const stopTimer = setTimeout(stop, 30_000);
try {
// Concatenate transcripts from all turn.end events to get the full transcript
// Do not strip or add whitespace!
let fullTranscript = '';
for await (const event of ws.stream()) {
if (event.type === 'message') {
const m = event.message;
switch (m.type) {
case 'connected':
console.log(`connected | request_id=${m.request_id}`);
break;
case 'turn.start':
console.log('turn.start |');
break;
case 'turn.update':
console.log(`turn.update | ${m.transcript}`);
break;
case 'turn.eager_end':
console.log(`turn.eager_end | ${m.transcript}`);
break;
case 'turn.resume':
console.log('turn.resume |');
break;
case 'turn.end':
console.log(`turn.end | ${m.transcript}`);
fullTranscript += m.transcript;
break;
}
} else if (event.type === 'error') {
console.error(`error | ${event.error.message}`);
}
}
console.log(`Full transcript: ${JSON.stringify(fullTranscript)}`);
} finally {
stopped = true;
clearTimeout(stopTimer);
source.disconnect();
capture.disconnect();
mediaStream.getTracks().forEach((t) => t.stop());
await audioCtx.close();
}
}
Run this example
- Python
- Python (Async)
- TypeScript
- TypeScript (Browser)
git clone --branch v3.2.0 https://github.com/cartesia-ai/cartesia-python
cd cartesia-python
uv sync
CARTESIA_API_KEY=YOUR_KEY uv run examples/examples.py stt_auto_finalize_websocket
git clone --branch v3.2.0 https://github.com/cartesia-ai/cartesia-python
cd cartesia-python
uv sync
CARTESIA_API_KEY=YOUR_KEY uv run examples/async_examples.py stt_auto_finalize_websocket_async
git clone --branch v3.2.0 https://github.com/cartesia-ai/cartesia-js
cd cartesia-js
pnpm i
CARTESIA_API_KEY=YOUR_KEY pnpm tsn examples/node_examples.ts sttAutoFinalizeWebsocket
This example runs in the browser. See the Next.js example for a working setup.