From 4ce5227d572d5120133957d892d0b975b0010c7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asier=20Zunzunegui=20Valc=C3=A1rcel?= Date: Sun, 19 Apr 2026 10:34:05 +0200 Subject: [PATCH] feat(tts): Wyoming service discovery and multi-model seeding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace single PIPER_HOST/PIPER_PORT with PIPER_SERVICES (comma-separated host:port pairs). On worker startup, send describe→info to each endpoint, upsert voice and languages into tts_services, and warn for feeds that reference unknown services. Co-Authored-By: Claude Sonnet 4.6 --- compose.yaml | 4 +-- src/db/index.ts | 14 +++++++-- src/db/schema.ts | 2 ++ src/db/tts-services.ts | 36 ++++++++++++++++++++++- src/services/tts.ts | 59 ++++++++++++++++++++++++++++++++++++++ src/utils/env.ts | 9 ++++-- src/worker/index.ts | 36 ++++++++++++++++++++++- tests/helpers/schema.ts | 2 ++ tests/utils/env.test.ts | 26 ++++------------- tests/worker/index.test.ts | 16 +++++++++++ 10 files changed, 175 insertions(+), 29 deletions(-) diff --git a/compose.yaml b/compose.yaml index 10520e1..cb667da 100644 --- a/compose.yaml +++ b/compose.yaml @@ -9,8 +9,8 @@ services: BASE_URL: ${BASE_URL:-http://localhost:3000} RSS_URL: ${RSS_URL:-} POLL_INTERVAL: ${POLL_INTERVAL:-} - PIPER_HOST: tts - PIPER_PORT: "10200" + PIPER_SERVICES: "tts-english:10200,tts-spanish:10200" + PIPER_VOICE: ${PIPER_VOICE:-en_US-lessac-medium} TTS_TIMEOUT: ${TTS_TIMEOUT:-300} TTS_MAX_RETRIES: ${TTS_MAX_RETRIES:-3} RSS_FETCH_TIMEOUT: ${RSS_FETCH_TIMEOUT:-30000} diff --git a/src/db/index.ts b/src/db/index.ts index ef5cdb0..f101a81 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -14,6 +14,8 @@ CREATE TABLE IF NOT EXISTS tts_services ( name TEXT NOT NULL, host TEXT NOT NULL, port INTEGER NOT NULL, + voice TEXT, + languages TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); `; @@ -75,9 +77,13 @@ let _db: Db | null = null; function seedDefaultTtsService(sqlite: Database.Database): void { const ttsCount = (sqlite.prepare('SELECT COUNT(*) as c FROM tts_services').get() as { c: number }).c; if (ttsCount === 0) { - const host = process.env['PIPER_HOST'] || 'localhost'; - const port = Number(process.env['PIPER_PORT'] || '10200'); - sqlite.prepare('INSERT INTO tts_services (name, host, port) VALUES (?, ?, ?)').run('Default', host, port); + const piperServices = process.env['PIPER_SERVICES']; + if (piperServices) { + const first = piperServices.split(',')[0]!.trim(); + const [host, portStr] = first.split(':'); + const port = Number(portStr || '10200'); + sqlite.prepare('INSERT INTO tts_services (name, host, port) VALUES (?, ?, ?)').run(`${host}:${port}`, host ?? 'localhost', port); + } } } @@ -99,6 +105,8 @@ export function getDb(dbPath?: string): Db { try { _sqlite.exec('ALTER TABLE articles ADD COLUMN tts_elapsed_ms INTEGER'); } catch { /* already exists */ } try { _sqlite.exec('ALTER TABLE articles ADD COLUMN feed_id INTEGER REFERENCES feeds(id)'); } catch { /* already exists */ } + try { _sqlite.exec('ALTER TABLE tts_services ADD COLUMN voice TEXT'); } catch { /* already exists */ } + try { _sqlite.exec('ALTER TABLE tts_services ADD COLUMN languages TEXT'); } catch { /* already exists */ } seedDefaultTtsService(_sqlite); diff --git a/src/db/schema.ts b/src/db/schema.ts index c18e704..a643807 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -7,6 +7,8 @@ export const ttsServices = sqliteTable('tts_services', { name: text('name').notNull(), host: text('host').notNull(), port: integer('port').notNull(), + voice: text('voice'), + languages: text('languages'), created_at: text('created_at').notNull().default(sql`(datetime('now'))`), }); diff --git a/src/db/tts-services.ts b/src/db/tts-services.ts index 9e17408..3818298 100644 --- a/src/db/tts-services.ts +++ b/src/db/tts-services.ts @@ -1,4 +1,4 @@ -import { eq } from 'drizzle-orm'; +import { eq, and } from 'drizzle-orm'; import type { Db } from './index.js'; import { ttsServices } from './schema.js'; @@ -8,6 +8,8 @@ export interface InsertTtsServiceParams { name: string; host: string; port: number; + voice?: string | null; + languages?: string | null; } export type UpdateTtsServiceParams = Partial; @@ -34,3 +36,35 @@ export function deleteTtsService(db: Db, id: number): boolean { const result = db.delete(ttsServices).where(eq(ttsServices.id, id)).run(); return result.changes > 0; } + +export function getTtsServiceByHostPort(db: Db, host: string, port: number) { + return db.select().from(ttsServices).where(and(eq(ttsServices.host, host), eq(ttsServices.port, port))).get(); +} + +export function upsertTtsServiceByHostPort( + db: Db, + host: string, + port: number, + voice: string, + languages: string[], +): import('./schema.js').TtsService { + const existing = getTtsServiceByHostPort(db, host, port); + const languagesJson = JSON.stringify(languages); + if (existing) { + const updated = db + .update(ttsServices) + .set({ voice, languages: languagesJson }) + .where(eq(ttsServices.id, existing.id)) + .returning() + .get(); + if (!updated) throw new Error(`Failed to update TTS service id=${existing.id}`); + return updated; + } + const result = db + .insert(ttsServices) + .values({ name: `${voice} (${host}:${port})`, host, port, voice, languages: languagesJson }) + .returning() + .get(); + if (!result) throw new Error('Failed to insert TTS service'); + return result; +} diff --git a/src/services/tts.ts b/src/services/tts.ts index 9332401..f6c0db4 100644 --- a/src/services/tts.ts +++ b/src/services/tts.ts @@ -258,6 +258,65 @@ export async function synthesise( }); } +export interface TtsServiceInfo { + voice: string; + languages: string[]; +} + +export async function discoverService(host: string, port: number): Promise { + return new Promise((resolve) => { + const socket = new net.Socket(); + let settled = false; + let recvBuf = Buffer.alloc(0); + + const done = (result: TtsServiceInfo | null) => { + if (settled) return; + settled = true; + clearTimeout(timeout); + socket.destroy(); + resolve(result); + }; + + const timeout = setTimeout(() => { + logger.warn(`TTS discover timeout for ${host}:${port}`); + done(null); + }, 10_000); + + socket.setNoDelay(true); + socket.connect(port, host, () => { + socket.write(JSON.stringify({ type: 'describe' }) + '\n'); + }); + + socket.on('data', (chunk: Buffer) => { + if (settled) return; + recvBuf = Buffer.concat([recvBuf, chunk]); + const nlIdx = recvBuf.indexOf(0x0a); + if (nlIdx === -1) return; + const line = recvBuf.subarray(0, nlIdx).toString('utf8').trim(); + try { + const event = JSON.parse(line) as Record; + if (event['type'] !== 'info') return; + const data = event['data'] as Record | undefined; + const tts = (data?.['tts'] as unknown[] | undefined)?.[0] as Record | undefined; + if (!tts) { done(null); return; } + const voice = tts['name'] as string; + const languages = (tts['languages'] as string[] | undefined) ?? []; + done({ voice, languages }); + } catch { + logger.warn(`TTS discover: unexpected response from ${host}:${port} — ${line.slice(0, 120)}`); + done(null); + } + }); + + socket.on('error', (err) => { + logger.warn(`TTS discover connection error for ${host}:${port} — ${err.message}`); + done(null); + }); + + socket.on('end', () => done(null)); + }); +} + /** * Assembles a valid WAV file from raw PCM chunks and audio metadata. * Wyoming audio-chunk payloads are raw little-endian PCM — no WAV header. diff --git a/src/utils/env.ts b/src/utils/env.ts index e449900..eb4fe80 100644 --- a/src/utils/env.ts +++ b/src/utils/env.ts @@ -14,11 +14,16 @@ function num(key: string, defaultValue: number): number { return n; } +function requireStr(key: string): string { + const val = process.env[key]; + if (!val) throw new Error(`Required env var ${key} is not set`); + return val; +} + export const env = { // Worker POLL_INTERVAL: () => str('POLL_INTERVAL'), - PIPER_HOST: () => str('PIPER_HOST', 'localhost'), - PIPER_PORT: () => num('PIPER_PORT', 10200), + PIPER_SERVICES: () => requireStr('PIPER_SERVICES'), TTS_TIMEOUT: () => num('TTS_TIMEOUT', 300) * 1000, TTS_MAX_RETRIES: () => num('TTS_MAX_RETRIES', 3), RSS_FETCH_TIMEOUT: () => num('RSS_FETCH_TIMEOUT', 30000), diff --git a/src/worker/index.ts b/src/worker/index.ts index 68a4119..fa41b88 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -6,8 +6,9 @@ import type { Feed, TtsService } from '../db/index.js'; import { resetFailedRetries, resetConvertingArticles, resetAllArticlesForRegen } from '../db/articles.js'; import { setWorkerStatus } from '../db/worker-state.js'; import { getFeeds } from '../db/feeds.js'; -import { getTtsServiceById } from '../db/tts-services.js'; +import { getTtsServiceById, getTtsServices, upsertTtsServiceByHostPort } from '../db/tts-services.js'; import { processFeed, processPendingArticles } from '../services/rss.js'; +import { discoverService } from '../services/tts.js'; import { runCleanup } from '../services/cleanup.js'; import { env } from '../utils/env.js'; import { logger } from '../utils/logger.js'; @@ -47,6 +48,37 @@ function buildCleanupOpts(feed: Feed) { }; } +async function seedTtsServices(): Promise { + const raw = env.PIPER_SERVICES(); + const endpoints = raw.split(',').map((s: string) => { + const trimmed = s.trim(); + const lastColon = trimmed.lastIndexOf(':'); + const host = trimmed.slice(0, lastColon); + const port = Number(trimmed.slice(lastColon + 1)); + return { host, port }; + }); + + const db = getDb(DB_PATH); + + for (const { host, port } of endpoints) { + const info = await discoverService(host, port); + if (!info) { + logger.warn(`TTS service ${host}:${port} did not respond to discovery — skipping upsert`); + continue; + } + const svc = upsertTtsServiceByHostPort(db, host, port, info.voice, info.languages); + logger.info(`TTS service discovered: ${host}:${port} voice="${info.voice}" languages=${JSON.stringify(info.languages)} id=${svc.id}`); + } + + const knownIds = new Set(getTtsServices(db).map((s) => s.id)); + const feeds = getFeeds(db); + for (const feed of feeds) { + if (!knownIds.has(feed.tts_service_id)) { + logger.warn(`Feed "${feed.name}" references unknown TTS service id=${feed.tts_service_id}`); + } + } +} + let isRunning = false; async function runOnce(isFirst = false): Promise { @@ -180,6 +212,8 @@ async function main(): Promise { // Recover from any prior crash where status was left as 'running'. setWorkerStatus(getDb(DB_PATH), 'idle'); + await seedTtsServices(); + // Run immediately on startup (reset any stale converting articles first) await runOnce(true); diff --git a/tests/helpers/schema.ts b/tests/helpers/schema.ts index 4b07b9c..bd5c192 100644 --- a/tests/helpers/schema.ts +++ b/tests/helpers/schema.ts @@ -4,6 +4,8 @@ CREATE TABLE IF NOT EXISTS tts_services ( name TEXT NOT NULL, host TEXT NOT NULL, port INTEGER NOT NULL, + voice TEXT, + languages TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS feeds ( diff --git a/tests/utils/env.test.ts b/tests/utils/env.test.ts index 75bfd89..4dca9d0 100644 --- a/tests/utils/env.test.ts +++ b/tests/utils/env.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { env } from '../../src/utils/env.ts'; const KEYS = [ - 'POLL_INTERVAL', 'PIPER_HOST', 'PIPER_PORT', 'TTS_TIMEOUT', + 'POLL_INTERVAL', 'PIPER_SERVICES', 'TTS_TIMEOUT', 'TTS_MAX_RETRIES', 'RSS_FETCH_TIMEOUT', 'MAX_AUDIO_FILES', 'PORT', 'BASE_URL', 'UNAVAILABLE_MESSAGE', 'TTS_FAILED_MESSAGE', ]; @@ -31,27 +31,13 @@ describe('env', () => { expect(env.POLL_INTERVAL()).toBe('3600'); }); - it('PIPER_HOST defaults to localhost', () => { - expect(env.PIPER_HOST()).toBe('localhost'); + it('PIPER_SERVICES throws when unset', () => { + expect(() => env.PIPER_SERVICES()).toThrow('PIPER_SERVICES'); }); - it('PIPER_HOST returns value when set', () => { - process.env.PIPER_HOST = 'tts-server'; - expect(env.PIPER_HOST()).toBe('tts-server'); - }); - - it('PIPER_PORT defaults to 10200', () => { - expect(env.PIPER_PORT()).toBe(10200); - }); - - it('PIPER_PORT returns numeric value when set', () => { - process.env.PIPER_PORT = '9999'; - expect(env.PIPER_PORT()).toBe(9999); - }); - - it('PIPER_PORT throws on non-numeric value', () => { - process.env.PIPER_PORT = 'abc'; - expect(() => env.PIPER_PORT()).toThrow('PIPER_PORT must be a number'); + it('PIPER_SERVICES returns value when set', () => { + process.env.PIPER_SERVICES = 'localhost:10200,tts-2:10201'; + expect(env.PIPER_SERVICES()).toBe('localhost:10200,tts-2:10201'); }); it('TTS_TIMEOUT defaults to 300000ms', () => { diff --git a/tests/worker/index.test.ts b/tests/worker/index.test.ts index 7af4cc7..5f7e832 100644 --- a/tests/worker/index.test.ts +++ b/tests/worker/index.test.ts @@ -28,6 +28,11 @@ const mocks = vi.hoisted(() => ({ }, ttsServices: { getTtsServiceById: vi.fn(() => null as ReturnType), + getTtsServices: vi.fn(() => [] as ReturnType), + upsertTtsServiceByHostPort: vi.fn(() => ({ id: 1, name: 'svc', host: 'localhost', port: 10200, voice: 'v', languages: '["en"]' } as ReturnType)), + }, + tts: { + discoverService: vi.fn().mockResolvedValue({ voice: 'v', languages: ['en'] }), }, rss: { processFeed: vi.fn().mockResolvedValue(undefined), @@ -38,6 +43,7 @@ const mocks = vi.hoisted(() => ({ }, env: { POLL_INTERVAL: vi.fn(() => undefined as string | undefined), + PIPER_SERVICES: vi.fn(() => 'localhost:10200'), TTS_MAX_RETRIES: vi.fn(() => 3), TTS_TIMEOUT: vi.fn(() => 300_000), RSS_FETCH_TIMEOUT: vi.fn(() => 30_000), @@ -89,6 +95,12 @@ vi.mock('../../src/db/feeds.js', () => ({ vi.mock('../../src/db/tts-services.js', () => ({ getTtsServiceById: mocks.ttsServices.getTtsServiceById, + getTtsServices: mocks.ttsServices.getTtsServices, + upsertTtsServiceByHostPort: mocks.ttsServices.upsertTtsServiceByHostPort, +})); + +vi.mock('../../src/services/tts.js', () => ({ + discoverService: mocks.tts.discoverService, })); vi.mock('../../src/services/rss.js', () => ({ @@ -136,9 +148,13 @@ beforeEach(() => { mocks.articles.resetAllArticlesForRegen.mockReturnValue(0); mocks.feeds.getFeeds.mockReturnValue([]); mocks.ttsServices.getTtsServiceById.mockReturnValue(null); + mocks.ttsServices.getTtsServices.mockReturnValue([]); + mocks.ttsServices.upsertTtsServiceByHostPort.mockReturnValue({ id: 1, name: 'svc', host: 'localhost', port: 10200, voice: 'v', languages: '["en"]' } as ReturnType); + mocks.tts.discoverService.mockResolvedValue({ voice: 'v', languages: ['en'] }); mocks.rss.processFeed.mockResolvedValue(undefined); mocks.rss.processPendingArticles.mockResolvedValue(undefined); mocks.env.POLL_INTERVAL.mockReturnValue(undefined); + mocks.env.PIPER_SERVICES.mockReturnValue('localhost:10200'); mocks.env.TTS_MAX_RETRIES.mockReturnValue(3); mocks.env.TTS_TIMEOUT.mockReturnValue(300_000); mocks.env.RSS_FETCH_TIMEOUT.mockReturnValue(30_000);