Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ services:
BASE_URL: ${BASE_URL:-http://localhost:3000}
RSS_URL: ${RSS_URL:-}
POLL_INTERVAL: ${POLL_INTERVAL:-}
PIPER_HOST: tts
PIPER_PORT: "10200"
PIPER_SERVICES: "tts-english:10200,tts-spanish:10200"
PIPER_VOICE: ${PIPER_VOICE:-en_US-lessac-medium}
TTS_TIMEOUT: ${TTS_TIMEOUT:-300}
TTS_MAX_RETRIES: ${TTS_MAX_RETRIES:-3}
RSS_FETCH_TIMEOUT: ${RSS_FETCH_TIMEOUT:-30000}
Expand Down
14 changes: 11 additions & 3 deletions src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ CREATE TABLE IF NOT EXISTS tts_services (
name TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL,
voice TEXT,
languages TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`;
Expand Down Expand Up @@ -75,9 +77,13 @@ let _db: Db | null = null;
function seedDefaultTtsService(sqlite: Database.Database): void {
const ttsCount = (sqlite.prepare('SELECT COUNT(*) as c FROM tts_services').get() as { c: number }).c;
if (ttsCount === 0) {
const host = process.env['PIPER_HOST'] || 'localhost';
const port = Number(process.env['PIPER_PORT'] || '10200');
sqlite.prepare('INSERT INTO tts_services (name, host, port) VALUES (?, ?, ?)').run('Default', host, port);
const piperServices = process.env['PIPER_SERVICES'];
if (piperServices) {
const first = piperServices.split(',')[0]!.trim();
const [host, portStr] = first.split(':');
const port = Number(portStr || '10200');
sqlite.prepare('INSERT INTO tts_services (name, host, port) VALUES (?, ?, ?)').run(`${host}:${port}`, host ?? 'localhost', port);
}
}
}

Expand All @@ -99,6 +105,8 @@ export function getDb(dbPath?: string): Db {

try { _sqlite.exec('ALTER TABLE articles ADD COLUMN tts_elapsed_ms INTEGER'); } catch { /* already exists */ }
try { _sqlite.exec('ALTER TABLE articles ADD COLUMN feed_id INTEGER REFERENCES feeds(id)'); } catch { /* already exists */ }
try { _sqlite.exec('ALTER TABLE tts_services ADD COLUMN voice TEXT'); } catch { /* already exists */ }
try { _sqlite.exec('ALTER TABLE tts_services ADD COLUMN languages TEXT'); } catch { /* already exists */ }

seedDefaultTtsService(_sqlite);

Expand Down
2 changes: 2 additions & 0 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export const ttsServices = sqliteTable('tts_services', {
name: text('name').notNull(),
host: text('host').notNull(),
port: integer('port').notNull(),
voice: text('voice'),
languages: text('languages'),
created_at: text('created_at').notNull().default(sql`(datetime('now'))`),
});

Expand Down
36 changes: 35 additions & 1 deletion src/db/tts-services.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { eq } from 'drizzle-orm';
import { eq, and } from 'drizzle-orm';
import type { Db } from './index.js';
import { ttsServices } from './schema.js';

Expand All @@ -8,6 +8,8 @@ export interface InsertTtsServiceParams {
name: string;
host: string;
port: number;
voice?: string | null;
languages?: string | null;
}

export type UpdateTtsServiceParams = Partial<InsertTtsServiceParams>;
Expand All @@ -34,3 +36,35 @@ export function deleteTtsService(db: Db, id: number): boolean {
const result = db.delete(ttsServices).where(eq(ttsServices.id, id)).run();
return result.changes > 0;
}

export function getTtsServiceByHostPort(db: Db, host: string, port: number) {
return db.select().from(ttsServices).where(and(eq(ttsServices.host, host), eq(ttsServices.port, port))).get();
}

export function upsertTtsServiceByHostPort(
db: Db,
host: string,
port: number,
voice: string,
languages: string[],
): import('./schema.js').TtsService {
const existing = getTtsServiceByHostPort(db, host, port);
const languagesJson = JSON.stringify(languages);
if (existing) {
const updated = db
.update(ttsServices)
.set({ voice, languages: languagesJson })
.where(eq(ttsServices.id, existing.id))
.returning()
.get();
if (!updated) throw new Error(`Failed to update TTS service id=${existing.id}`);
return updated;
}
const result = db
.insert(ttsServices)
.values({ name: `${voice} (${host}:${port})`, host, port, voice, languages: languagesJson })
.returning()
.get();
if (!result) throw new Error('Failed to insert TTS service');
return result;
}
59 changes: 59 additions & 0 deletions src/services/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,65 @@ export async function synthesise(
});
}

export interface TtsServiceInfo {
voice: string;
languages: string[];
}

export async function discoverService(host: string, port: number): Promise<TtsServiceInfo | null> {
return new Promise((resolve) => {
const socket = new net.Socket();
let settled = false;
let recvBuf = Buffer.alloc(0);

const done = (result: TtsServiceInfo | null) => {
if (settled) return;
settled = true;
clearTimeout(timeout);
socket.destroy();
resolve(result);
};

const timeout = setTimeout(() => {
logger.warn(`TTS discover timeout for ${host}:${port}`);
done(null);
}, 10_000);

socket.setNoDelay(true);
socket.connect(port, host, () => {
socket.write(JSON.stringify({ type: 'describe' }) + '\n');
});

socket.on('data', (chunk: Buffer) => {
if (settled) return;
recvBuf = Buffer.concat([recvBuf, chunk]);
const nlIdx = recvBuf.indexOf(0x0a);
if (nlIdx === -1) return;
const line = recvBuf.subarray(0, nlIdx).toString('utf8').trim();
try {
const event = JSON.parse(line) as Record<string, unknown>;
if (event['type'] !== 'info') return;
const data = event['data'] as Record<string, unknown> | undefined;
const tts = (data?.['tts'] as unknown[] | undefined)?.[0] as Record<string, unknown> | undefined;
if (!tts) { done(null); return; }
const voice = tts['name'] as string;
const languages = (tts['languages'] as string[] | undefined) ?? [];
done({ voice, languages });
} catch {
logger.warn(`TTS discover: unexpected response from ${host}:${port} — ${line.slice(0, 120)}`);
done(null);
}
});

socket.on('error', (err) => {
logger.warn(`TTS discover connection error for ${host}:${port} — ${err.message}`);
done(null);
});

socket.on('end', () => done(null));
});
}

/**
* Assembles a valid WAV file from raw PCM chunks and audio metadata.
* Wyoming audio-chunk payloads are raw little-endian PCM — no WAV header.
Expand Down
9 changes: 7 additions & 2 deletions src/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ function num(key: string, defaultValue: number): number {
return n;
}

function requireStr(key: string): string {
const val = process.env[key];
if (!val) throw new Error(`Required env var ${key} is not set`);
return val;
}

export const env = {
// Worker
POLL_INTERVAL: () => str('POLL_INTERVAL'),
PIPER_HOST: () => str('PIPER_HOST', 'localhost'),
PIPER_PORT: () => num('PIPER_PORT', 10200),
PIPER_SERVICES: () => requireStr('PIPER_SERVICES'),
TTS_TIMEOUT: () => num('TTS_TIMEOUT', 300) * 1000,
TTS_MAX_RETRIES: () => num('TTS_MAX_RETRIES', 3),
RSS_FETCH_TIMEOUT: () => num('RSS_FETCH_TIMEOUT', 30000),
Expand Down
36 changes: 35 additions & 1 deletion src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import type { Feed, TtsService } from '../db/index.js';
import { resetFailedRetries, resetConvertingArticles, resetAllArticlesForRegen } from '../db/articles.js';
import { setWorkerStatus } from '../db/worker-state.js';
import { getFeeds } from '../db/feeds.js';
import { getTtsServiceById } from '../db/tts-services.js';
import { getTtsServiceById, getTtsServices, upsertTtsServiceByHostPort } from '../db/tts-services.js';
import { processFeed, processPendingArticles } from '../services/rss.js';
import { discoverService } from '../services/tts.js';
import { runCleanup } from '../services/cleanup.js';
import { env } from '../utils/env.js';
import { logger } from '../utils/logger.js';
Expand Down Expand Up @@ -47,6 +48,37 @@ function buildCleanupOpts(feed: Feed) {
};
}

async function seedTtsServices(): Promise<void> {
const raw = env.PIPER_SERVICES();
const endpoints = raw.split(',').map((s: string) => {
const trimmed = s.trim();
const lastColon = trimmed.lastIndexOf(':');
const host = trimmed.slice(0, lastColon);
const port = Number(trimmed.slice(lastColon + 1));
return { host, port };
});

const db = getDb(DB_PATH);

for (const { host, port } of endpoints) {
const info = await discoverService(host, port);
if (!info) {
logger.warn(`TTS service ${host}:${port} did not respond to discovery — skipping upsert`);
continue;
}
const svc = upsertTtsServiceByHostPort(db, host, port, info.voice, info.languages);
logger.info(`TTS service discovered: ${host}:${port} voice="${info.voice}" languages=${JSON.stringify(info.languages)} id=${svc.id}`);
}

const knownIds = new Set(getTtsServices(db).map((s) => s.id));
const feeds = getFeeds(db);
for (const feed of feeds) {
if (!knownIds.has(feed.tts_service_id)) {
logger.warn(`Feed "${feed.name}" references unknown TTS service id=${feed.tts_service_id}`);
}
}
}

let isRunning = false;

async function runOnce(isFirst = false): Promise<void> {
Expand Down Expand Up @@ -180,6 +212,8 @@ async function main(): Promise<void> {
// Recover from any prior crash where status was left as 'running'.
setWorkerStatus(getDb(DB_PATH), 'idle');

await seedTtsServices();

// Run immediately on startup (reset any stale converting articles first)
await runOnce(true);

Expand Down
2 changes: 2 additions & 0 deletions tests/helpers/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ CREATE TABLE IF NOT EXISTS tts_services (
name TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL,
voice TEXT,
languages TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS feeds (
Expand Down
26 changes: 6 additions & 20 deletions tests/utils/env.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { env } from '../../src/utils/env.ts';

const KEYS = [
'POLL_INTERVAL', 'PIPER_HOST', 'PIPER_PORT', 'TTS_TIMEOUT',
'POLL_INTERVAL', 'PIPER_SERVICES', 'TTS_TIMEOUT',
'TTS_MAX_RETRIES', 'RSS_FETCH_TIMEOUT', 'MAX_AUDIO_FILES',
'PORT', 'BASE_URL', 'UNAVAILABLE_MESSAGE', 'TTS_FAILED_MESSAGE',
];
Expand Down Expand Up @@ -31,27 +31,13 @@ describe('env', () => {
expect(env.POLL_INTERVAL()).toBe('3600');
});

it('PIPER_HOST defaults to localhost', () => {
expect(env.PIPER_HOST()).toBe('localhost');
it('PIPER_SERVICES throws when unset', () => {
expect(() => env.PIPER_SERVICES()).toThrow('PIPER_SERVICES');
});

it('PIPER_HOST returns value when set', () => {
process.env.PIPER_HOST = 'tts-server';
expect(env.PIPER_HOST()).toBe('tts-server');
});

it('PIPER_PORT defaults to 10200', () => {
expect(env.PIPER_PORT()).toBe(10200);
});

it('PIPER_PORT returns numeric value when set', () => {
process.env.PIPER_PORT = '9999';
expect(env.PIPER_PORT()).toBe(9999);
});

it('PIPER_PORT throws on non-numeric value', () => {
process.env.PIPER_PORT = 'abc';
expect(() => env.PIPER_PORT()).toThrow('PIPER_PORT must be a number');
it('PIPER_SERVICES returns value when set', () => {
process.env.PIPER_SERVICES = 'localhost:10200,tts-2:10201';
expect(env.PIPER_SERVICES()).toBe('localhost:10200,tts-2:10201');
});

it('TTS_TIMEOUT defaults to 300000ms', () => {
Expand Down
16 changes: 16 additions & 0 deletions tests/worker/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ const mocks = vi.hoisted(() => ({
},
ttsServices: {
getTtsServiceById: vi.fn(() => null as ReturnType<typeof import('../../src/db/tts-services.js').getTtsServiceById>),
getTtsServices: vi.fn(() => [] as ReturnType<typeof import('../../src/db/tts-services.js').getTtsServices>),
upsertTtsServiceByHostPort: vi.fn(() => ({ id: 1, name: 'svc', host: 'localhost', port: 10200, voice: 'v', languages: '["en"]' } as ReturnType<typeof import('../../src/db/tts-services.js').upsertTtsServiceByHostPort>)),
},
tts: {
discoverService: vi.fn().mockResolvedValue({ voice: 'v', languages: ['en'] }),
},
rss: {
processFeed: vi.fn().mockResolvedValue(undefined),
Expand All @@ -38,6 +43,7 @@ const mocks = vi.hoisted(() => ({
},
env: {
POLL_INTERVAL: vi.fn(() => undefined as string | undefined),
PIPER_SERVICES: vi.fn(() => 'localhost:10200'),
TTS_MAX_RETRIES: vi.fn(() => 3),
TTS_TIMEOUT: vi.fn(() => 300_000),
RSS_FETCH_TIMEOUT: vi.fn(() => 30_000),
Expand Down Expand Up @@ -89,6 +95,12 @@ vi.mock('../../src/db/feeds.js', () => ({

vi.mock('../../src/db/tts-services.js', () => ({
getTtsServiceById: mocks.ttsServices.getTtsServiceById,
getTtsServices: mocks.ttsServices.getTtsServices,
upsertTtsServiceByHostPort: mocks.ttsServices.upsertTtsServiceByHostPort,
}));

vi.mock('../../src/services/tts.js', () => ({
discoverService: mocks.tts.discoverService,
}));

vi.mock('../../src/services/rss.js', () => ({
Expand Down Expand Up @@ -136,9 +148,13 @@ beforeEach(() => {
mocks.articles.resetAllArticlesForRegen.mockReturnValue(0);
mocks.feeds.getFeeds.mockReturnValue([]);
mocks.ttsServices.getTtsServiceById.mockReturnValue(null);
mocks.ttsServices.getTtsServices.mockReturnValue([]);
mocks.ttsServices.upsertTtsServiceByHostPort.mockReturnValue({ id: 1, name: 'svc', host: 'localhost', port: 10200, voice: 'v', languages: '["en"]' } as ReturnType<typeof import('../../src/db/tts-services.js').upsertTtsServiceByHostPort>);
mocks.tts.discoverService.mockResolvedValue({ voice: 'v', languages: ['en'] });
mocks.rss.processFeed.mockResolvedValue(undefined);
mocks.rss.processPendingArticles.mockResolvedValue(undefined);
mocks.env.POLL_INTERVAL.mockReturnValue(undefined);
mocks.env.PIPER_SERVICES.mockReturnValue('localhost:10200');
mocks.env.TTS_MAX_RETRIES.mockReturnValue(3);
mocks.env.TTS_TIMEOUT.mockReturnValue(300_000);
mocks.env.RSS_FETCH_TIMEOUT.mockReturnValue(30_000);
Expand Down