- Python
- Python (Async)
- TypeScript
def tts_websocket_flushing(client: Cartesia) -> None:
"""Demonstrates manual flushing to separate audio from different transcripts."""
transcripts = ["Stay hungry, ", "stay foolish."]
output_format = {"container": "raw", "encoding": "pcm_f32le", "sample_rate": 44100}
with client.tts.websocket_connect() as connection:
ctx = connection.context(
model_id="sonic-3",
voice={"mode": "id", "id": "6ccbfb76-1fc6-48f7-b71d-91ac6298247b"},
output_format=output_format
) # Auto-generates context_id
# 1. Send first transcript
print("Sending first transcript...")
ctx.push(transcripts[0])
# 2. Flush! This forces all buffered audio for the first transcript to be generated
# and increments the flush_id counter on the server.
print("Flushing...")
ctx.flush()
# 3. Send second transcript
print("Sending second transcript...")
ctx.push(transcripts[1])
ctx.no_more_inputs()
import datetime
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
# We'll save audio to separate files based on flush_id
files: dict[int, IO[bytes]] = {}
for response in ctx.receive():
# Log every response, but redact audio data to avoid swamping the console.
loggable = {k: ("[...]" if k == "data" else v) for k, v in response.model_dump().items()}
print(f"Event: {loggable}")
if response.type == "chunk" and response.audio:
# Get flush_id from response (defaults to 0 if not present)
flush_id = response.flush_id or 0
if flush_id not in files:
filename = f"tts_flush_{flush_id}_{timestamp}.pcm"
files[flush_id] = open(filename, "wb")
files[flush_id].write(response.audio)
# Close all open files
for f in files.values():
f.close()
print("\nFinished.")
print("You can play the generated audio files with these commands:")
for flush_id, f in files.items():
print(f" Flush ID {flush_id}: ffplay -f f32le -ar 44100 {f.name}")
async def tts_websocket_flushing_async(client: AsyncCartesia) -> None:
"""Demonstrates manual flushing to separate audio from different transcripts."""
transcripts = ["First transcript.", "Second transcript."]
output_format = {"container": "raw", "encoding": "pcm_f32le", "sample_rate": 44100}
async with client.tts.websocket_connect() as connection:
ctx = connection.context()
# 1. Send first transcript
print("Sending first transcript...")
await ctx.send(
model_id="sonic-3",
transcript=transcripts[0],
voice={"mode": "id", "id": "6ccbfb76-1fc6-48f7-b71d-91ac6298247b"},
output_format=output_format,
continue_=True,
)
# 2. Flush!
print("Flushing...")
await ctx.send(
model_id="sonic-3",
transcript="",
voice={"mode": "id", "id": "6ccbfb76-1fc6-48f7-b71d-91ac6298247b"},
output_format=output_format,
continue_=True,
flush=True,
)
# 3. Send second transcript
print("Sending second transcript...")
await ctx.send(
model_id="sonic-3",
transcript=transcripts[1],
voice={"mode": "id", "id": "6ccbfb76-1fc6-48f7-b71d-91ac6298247b"},
output_format=output_format,
continue_=True,
)
await ctx.no_more_inputs()
import datetime
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
files: dict[int, IO[bytes]] = {}
async for response in ctx.receive():
if response.type == "chunk" and response.audio:
flush_id = response.flush_id or 0
if flush_id not in files:
filename = f"tts_flush_async_{flush_id}_{timestamp}.pcm"
files[flush_id] = open(filename, "wb")
print(f"Created new file for flush_id {flush_id}: {filename}")
files[flush_id].write(response.audio)
elif response.type == "flush_done":
print(f"Flush done received for flush_id: {response.flush_id}")
for f in files.values():
f.close()
print("\nFinished.")
print("You can play the generated audio files with these commands:")
for flush_id, f in files.items():
print(f" Flush ID {flush_id}: ffplay -f f32le -ar 44100 {f.name}")
async function ttsWebsocketFlushing(client: Cartesia): Promise<void> {
/** Demonstrates manual flushing to separate audio from different transcripts. */
const ws = await client.tts.websocket();
ws.on('error', (err) => console.error('WS error:', err.message));
const ctx = ws.context({
model_id: 'sonic-3',
voice: { mode: 'id', id: '6ccbfb76-1fc6-48f7-b71d-91ac6298247b' },
output_format: { container: 'raw', encoding: 'pcm_f32le', sample_rate: 44100 },
});
// 1. Send first transcript
console.log('Sending first transcript...');
await ctx.push({ transcript: 'Stay hungry, ' });
// 2. Flush — forces all buffered audio for the first transcript to be generated.
console.log('Flushing...');
await ctx.flush();
// 3. Send second transcript
console.log('Sending second transcript...');
await ctx.push({ transcript: 'stay foolish.' });
await ctx.no_more_inputs();
const ts = timestamp();
const files: Map<number, fs.WriteStream> = new Map();
for await (const event of ctx.receive()) {
// Log every response, but redact audio data to avoid swamping the console.
const loggable = { ...(event as any) };
if (loggable.data) loggable.data = '[...]';
console.log('Event:', JSON.stringify(loggable));
if (event.type === 'chunk' && event.audio) {
const flushId = (event as any).flush_id ?? 0;
if (!files.has(flushId)) {
const name = `tts_flush_${flushId}_${ts}.pcm`;
files.set(flushId, fs.createWriteStream(name));
}
files.get(flushId)!.write(event.audio);
}
}
for (const f of files.values()) f.end();
ws.close();
console.log('\nFinished. Play the generated audio files with:');
for (const [flushId, f] of files) {
console.log(` Flush ID ${flushId}: ffplay -f f32le -ar 44100 ${(f as any).path}`);
}
}
Run this example
- Python
- Python (Async)
- TypeScript
cd cartesia-python
CARTESIA_API_KEY=YOUR_KEY python3 examples/examples.py tts_websocket_flushing
cd cartesia-python
CARTESIA_API_KEY=YOUR_KEY python3 examples/async_examples.py tts_websocket_flushing_async
cd cartesia-js
CARTESIA_API_KEY=YOUR_KEY npx ts-node examples/node_examples.ts ttsWebsocketFlushing