From a98d951d57f76a04295648f3707adea4ee729d02 Mon Sep 17 00:00:00 2001 From: idrimi Date: Wed, 11 Mar 2026 13:22:29 +0100 Subject: [PATCH] cleanup snapcast service and ad scroll down refresh to dashboard --- src/app/pages/dashboard/dashboard.page.html | 18 +- src/app/pages/dashboard/dashboard.page.scss | 11 +- src/app/pages/dashboard/dashboard.page.ts | 34 +- src/app/services/snapcast.service.ts | 801 +++++++++----------- src/theme/variables.scss | 3 + 5 files changed, 420 insertions(+), 447 deletions(-) diff --git a/src/app/pages/dashboard/dashboard.page.html b/src/app/pages/dashboard/dashboard.page.html index 8dbd8eb..6fbdb6c 100644 --- a/src/app/pages/dashboard/dashboard.page.html +++ b/src/app/pages/dashboard/dashboard.page.html @@ -1,15 +1,25 @@ - + Dashboard - + + + + + + +
- {{today | date:"EEEE, d.MMMM"}} - +
+ {{today | date:"EEEE, d.MMMM"}} + +
+ {{name ? name+"s": 'Your'}} Home diff --git a/src/app/pages/dashboard/dashboard.page.scss b/src/app/pages/dashboard/dashboard.page.scss index 0cf520f..4e83d46 100644 --- a/src/app/pages/dashboard/dashboard.page.scss +++ b/src/app/pages/dashboard/dashboard.page.scss @@ -13,12 +13,14 @@ text-transform: uppercase; font-weight: 400; font-size: 12px; - position: absolute; - top: -10px; + // position: ; + // top: -10px; + // left:0; z-index: 1000; margin: 0; margin-left: 18px; - margin-bottom: 16px; + margin-bottom: -12px; + line-height: 0; } @@ -65,4 +67,5 @@ color: var(--ion-color-medium); z-index: 2000; - } \ No newline at end of file + } + diff --git a/src/app/pages/dashboard/dashboard.page.ts b/src/app/pages/dashboard/dashboard.page.ts index 60f2612..6a09d4c 100644 --- a/src/app/pages/dashboard/dashboard.page.ts +++ b/src/app/pages/dashboard/dashboard.page.ts @@ -1,5 +1,5 @@ import { Component, HostListener, OnInit } from '@angular/core'; -import { ModalController } from '@ionic/angular'; +import { ModalController, RefresherCustomEvent } from '@ionic/angular'; import { first, firstValueFrom, interval, Observable } from 'rxjs'; import { Client, Group, ServerDetail, SnapCastServerStatusResponse, Stream } from 'src/app/model/snapcast.model'; import { SnapcastService } from 'src/app/services/snapcast.service'; @@ -71,6 +71,8 @@ export class DashboardPage implements OnInit { speakerData: Speaker[] = []; + refreshingText: string = 'Refreshing... '; + constructor( @@ -242,6 +244,36 @@ export class DashboardPage implements OnInit { }); } + handleRefresh(event: RefresherCustomEvent) { + this.refreshSeverState(event); + + } + + refreshSeverState(event: RefresherCustomEvent): void { + console.log('Refreshing server state...'); + this.snapcastService.refreshState().subscribe({ + next: () => { + console.log('Server state refreshed successfully'); + const now = new Date(); + setTimeout(() => { this.refreshingText = 'Server state refreshed successfully at ' + now.toLocaleTimeString(); + }, 500); + // this.refreshingText = 'Server state refreshed successfully at ' + now.toLocaleTimeString(); + // event.target.complete(); + setTimeout(() => { + event.target.complete(); + this.refreshingText = 'Refreshing... '; + }, 1500); + }, + error: (error) => { + console.error('Error refreshing server state:', error); + this.refreshingText = 'Error refreshing server state'; + setTimeout(() => { + event.target.complete(); + }, 1500); + } + }); + } + diff --git a/src/app/services/snapcast.service.ts b/src/app/services/snapcast.service.ts index 963e6bc..103a124 100644 --- a/src/app/services/snapcast.service.ts +++ b/src/app/services/snapcast.service.ts @@ -1,24 +1,28 @@ import { Injectable, OnDestroy } from '@angular/core'; +import { HttpClient } from '@angular/common/http'; import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; import { + BehaviorSubject, + Observable, Subject, Subscription, - Observable, + catchError, + distinctUntilChanged, filter, map, - shareReplay, - catchError, take, + tap, throwError, - distinctUntilChanged, - switchMap, timer, EMPTY, - BehaviorSubject, + shareReplay, } from 'rxjs'; import { produce } from 'immer'; +import { Preferences } from '@capacitor/preferences'; +import { environment } from 'src/environments/environment'; +import { UserPreference } from '../enum/user-preference.enum'; -// --- Model Imports (ensure paths are correct) --- +// --- Model Imports --- import { SnapCastServerStatusResponse, Group, @@ -26,538 +30,459 @@ import { Stream, ServerDetail, Volume, - // Other models you might need for action methods - StreamControlCommandType, - StreamControlCommandSpecificParams, - StreamControlRpcPayloadParams, StreamLoopStatus, + StreamControlRpcPayloadParams, StreamSetPropertyRpcPayloadParams, + Server as SnapServerModel // Alias to avoid confusion with ServerDetail } from '../model/snapcast.model'; + import { SnapcastWebsocketNotification, + // Notification Payloads ClientVolumeChange, GroupMuteChange, GroupNameChange, GroupStreamChange, ServerOnUpdate as ServerOnUpdateNotificationParams, StreamOnProperties, StreamOnUpdate, ClientOnConnect, ClientOnDisconnect } from '../model/snapcast-websocket-notification.model'; -import { environment } from 'src/environments/environment'; -import { Preferences } from '@capacitor/preferences'; -import { UserPreference } from '../enum/user-preference.enum'; -import { HttpClient } from '@angular/common/http'; // --- Type Definitions --- -interface JsonRpcBaseRequest { jsonrpc: '2.0'; id: number; method: string; } -interface JsonRpcRequest

extends JsonRpcBaseRequest { params?: P; } -interface JsonRpcResponse { jsonrpc: '2.0'; id: number; result?: R; error?: { code: number; message: string }; } -type SnapcastWebSocketMessage = JsonRpcResponse | SnapcastWebsocketNotification | { error: string; data?: unknown; message?: string } | { [key: string]: any }; - -@Injectable({ providedIn: 'root' }) -export class SnapcastService implements OnDestroy { - private readonly DEFAULT_HOSTNAME = environment.snapcastServerUrl; - private readonly DEFAULT_PORT = 1780; - private readonly RECONNECT_INTERVAL_MS = 5000; - private readonly SERVER_STATUS_REFRESH_INTERVAL_MS = 10000; - - private USERPREFERENCE_HOSTNAME?: string; +interface JsonRpcRequest

{ + jsonrpc: '2.0'; + id: number; + method: string; + params?: P; +} + +interface JsonRpcResponse { + jsonrpc: '2.0'; + id: number; + result?: R; + error?: { code: number; message: string }; +} + +// Strict RPC Method Registry +export interface SnapcastRpcMethods { + 'Server.GetStatus': { params: void; result: SnapCastServerStatusResponse }; + 'Server.GetRpcVersion': { params: void; result: { major: number; minor: number; patch: number } }; + 'Server.DeleteClient': { params: { id: string }; result: void }; + + 'Client.SetVolume': { params: { id: string; volume: Volume }; result: void }; + 'Client.SetName': { params: { id: string; name: string }; result: void }; + 'Client.SetLatency': { params: { id: string; latency: number }; result: void }; + 'Client.GetStatus': { params: { id: string }; result: Client }; + + 'Group.SetMute': { params: { id: string; mute: boolean }; result: void }; + 'Group.SetStream': { params: { id: string; stream_id: string }; result: void }; + 'Group.SetClients': { params: { id: string; clients: string[] }; result: void }; + 'Group.SetName': { params: { id: string; name: string }; result: void }; + 'Group.GetStatus': { params: { id: string }; result: Group }; + + 'Stream.SetProperty': { params: StreamSetPropertyRpcPayloadParams; result: void }; + 'Stream.Control': { params: StreamControlRpcPayloadParams; result: void }; + 'Stream.Add': { params: { stream: Stream }; result: void }; + 'Stream.Remove': { params: { id: string }; result: void }; +} + +type SnapcastWebSocketMessage = JsonRpcResponse | SnapcastWebsocketNotification; + +// --- Internal Classes & Functions --- + +/** + * Handles WebSocket connection, reconnection, and RPC request/response correlation. + */ +class JsonRpcSocket { private ws$?: WebSocketSubject; - private rpcRequestId = 1; - private rpcResponses$ = new Subject>(); - private serviceSubscriptions = new Subscription(); - - // A single BehaviorSubject to hold the state. Initialized with null. - private stateSubject$ = new BehaviorSubject(null); - - // --- Public Observables for Component Consumption --- - // The main state observable. It's the single source of truth. - public readonly state$: Observable; - // Convenience observables derived from the main state$. - public readonly groups$: Observable; - public readonly streams$: Observable; - public readonly serverDetails$: Observable; - public readonly isConnected$ = new BehaviorSubject(false).asObservable(); - - constructor( - private http: HttpClient - ) { - - this.state$ = this.stateSubject$.asObservable().pipe( - shareReplay({ bufferSize: 1, refCount: true }) - ); - - // Derived observables use distinctUntilChanged with deep comparison for robustness. - this.groups$ = this.state$.pipe(map(state => state?.server?.groups || []), distinctUntilChanged((p, c) => JSON.stringify(p) === JSON.stringify(c))); - this.streams$ = this.state$.pipe(map(state => state?.server?.streams || []), distinctUntilChanged((p, c) => JSON.stringify(p) === JSON.stringify(c))); - this.serverDetails$ = this.state$.pipe(map(state => state?.server?.server), distinctUntilChanged((p, c) => JSON.stringify(p) === JSON.stringify(c))); - } - - // --- Core Connection and RPC Logic --- - - async connect(host: string = this.DEFAULT_HOSTNAME, port: number = this.DEFAULT_PORT, overrideUserPreference: boolean = false): Promise { - console.log(`SnapcastService: connect called with host=${host}, port=${port}, overrideUserPreference=${overrideUserPreference}`); - // Load user preference for hostname if available - const url = await Preferences.get({ key: UserPreference.SERVER_URL }); - // overwrite host if user did set a custom URL - if (url.value && !overrideUserPreference) { - this.USERPREFERENCE_HOSTNAME = url.value; - host = this.USERPREFERENCE_HOSTNAME; - } - + private requestId = 0; + + // Public Streams + public readonly notification$ = new Subject(); + public readonly connected$ = new BehaviorSubject(false); + + // Internal + private readonly rpcResponses$ = new Subject>(); + private socketSubscription: Subscription = new Subscription(); + private reconnectSubscription: Subscription = new Subscription(); + + constructor(private reconnectIntervalMs: number = 3000) {} + + connect(url: string, onOpen?: () => void): void { if (this.ws$ && !this.ws$.closed) return; - this.disconnectInternals(false); - const wsUrl = `ws://${host}:${port}/jsonrpc`; - console.info(`SnapcastService: Connecting to ${wsUrl}...`); + + // Clean up previous attempts + this.disconnect(false); const config: WebSocketSubjectConfig = { - url: wsUrl, + url, openObserver: { next: () => { - console.info('SnapcastService: WebSocket connection established.'); - this.fetchInitialServerStatus(); - - // // Setup periodic refresh - // const refreshSub = timer(this.SERVER_STATUS_REFRESH_INTERVAL_MS, this.SERVER_STATUS_REFRESH_INTERVAL_MS).subscribe(() => { - // this.fetchInitialServerStatus(); - // }); - // this.serviceSubscriptions.add(refreshSub); + console.info(`[JsonRpcSocket] Connected to ${url}`); + this.connected$.next(true); + onOpen?.(); }, }, closeObserver: { next: () => { - console.warn('SnapcastService: WebSocket connection closed. Attempting to reconnect...'); - (this.isConnected$ as BehaviorSubject).next(false); + console.warn(`[JsonRpcSocket] Closed. Reconnecting in ${this.reconnectIntervalMs}ms...`); + this.connected$.next(false); this.ws$ = undefined; - const reconnectSub = timer(this.RECONNECT_INTERVAL_MS).subscribe(() => this.connect(host, port)); - this.serviceSubscriptions.add(reconnectSub); + this.scheduleReconnect(url, onOpen); }, }, - deserializer: (e: MessageEvent): SnapcastWebSocketMessage => { + deserializer: (e) => { try { return JSON.parse(e.data); } - catch (err) { console.error('SnapcastService: Deserialization error', err, e.data); return { error: 'DeserializationError', data: e.data, message: (err as Error).message }; } - }, + catch (err) { console.error('JSON Parse Error', err); return null as any; } + } }; - this.ws$ = webSocket(config); - const messageHandlingSub = this.ws$.pipe( - catchError(error => { console.error('SnapcastService: WebSocket stream error.', error); (this.isConnected$ as BehaviorSubject).next(false); return EMPTY; }) + console.info(`[JsonRpcSocket] Connecting to ${url}...`); + this.ws$ = webSocket(config); + + this.socketSubscription = this.ws$.pipe( + catchError(err => { + console.error('[JsonRpcSocket] Stream Error', err); + // The closeObserver will trigger reconnection logic + return EMPTY; + }) ).subscribe({ - next: msg => this.routeMessage(msg), - error: err => { console.error('SnapcastService: WebSocket fatal subscription error:', err); (this.isConnected$ as BehaviorSubject).next(false); }, + next: (msg) => this.handleMessage(msg), + error: (err) => console.error('[JsonRpcSocket] Fatal Error', err) }); - this.serviceSubscriptions.add(messageHandlingSub); } - private fetchInitialServerStatus(): void { - this.rpc('Server.GetStatus').subscribe({ - next: response => { - if (response.result) { - this.stateSubject$.next(response.result); // Update the state with the full server status - } else if (response.error) { - console.error('SnapcastService: Error in initial status RPC result', response.error); - } - }, - error: err => console.error('SnapcastService: Failed RPC for initial status:', err), + private scheduleReconnect(url: string, onOpen?: () => void) { + this.reconnectSubscription.unsubscribe(); + this.reconnectSubscription = timer(this.reconnectIntervalMs).pipe(take(1)).subscribe(() => { + this.connect(url, onOpen); }); } - private routeMessage(message: SnapcastWebSocketMessage): void { - if (!message || typeof message !== 'object') { console.warn('SnapcastService: Received invalid message', message); return; } - - if ('id' in message && typeof (message as JsonRpcResponse).id === 'number' && !('method' in message)) { - this.rpcResponses$.next(message as JsonRpcResponse); - } else if ('method' in message && typeof (message as SnapcastWebsocketNotification).method === 'string') { - const notification = message as SnapcastWebsocketNotification; - // Server.OnUpdate sends a full state object, replacing the current one. - if (notification.method === 'Server.OnUpdate') { - const serverUpdate = notification.params as ServerOnUpdateNotificationParams; - if (serverUpdate.server) { - this.stateSubject$.next({ server: serverUpdate.server }); - } - } else { - // Other notifications apply a "delta" to the current state. - this.applyNotificationToState(notification); - } - } else if ((message as any).error === 'DeserializationError') { /* Already logged */ } - else { console.warn('SnapcastService: Unknown message structure:', message); } + disconnect(emitState = true) { + this.socketSubscription.unsubscribe(); + this.reconnectSubscription.unsubscribe(); + this.ws$?.complete(); + this.ws$ = undefined; + if (emitState) this.connected$.next(false); } - // --- State Update Logic --- + /** + * Sends a strictly typed JSON-RPC request. + */ + request( + method: M, + params?: SnapcastRpcMethods[M]['params'] + ): Observable { + + // Ensure we are connected before sending + if (!this.ws$ || !this.connected$.value) { + return throwError(() => new Error('WebSocket not connected')); + } - private applyNotificationToState(notification: SnapcastWebsocketNotification): void { - const currentState = this.stateSubject$.getValue(); - if (!currentState) { console.warn('SnapcastService: No current state to apply notification to.', notification); return; } + const id = ++this.requestId; + // Cast params to any because optionality of params is hard to satisfy strictly in generic constraint + const req: JsonRpcRequest = { jsonrpc: '2.0', id, method, params }; + + this.ws$.next(req as any); - // Use `immer` to produce a new immutable state by applying changes to a "draft". - const nextState = produce(currentState, draft => { - const server = draft.server; - if (!server) return; + return this.rpcResponses$.pipe( + filter(res => res.id === id), + take(1), + map(res => { + if (res.error) throw new Error(`RPC Error ${res.error.code}: ${res.error.message}`); + return res.result as SnapcastRpcMethods[M]['result']; + }) + ); + } + + private handleMessage(msg: SnapcastWebSocketMessage) { + if (!msg) return; + + // Check if Response (has ID) + if ('id' in msg) { + this.rpcResponses$.next(msg as JsonRpcResponse); + } + // Check if Notification (has Method but no ID) + else if ('method' in msg) { + this.notification$.next(msg as SnapcastWebsocketNotification); + } + } +} + +/** + * Pure Reducer for Snapcast State using Immer + */ +function snapcastReducer( + currentState: SnapCastServerStatusResponse | null, + notification: SnapcastWebsocketNotification +): SnapCastServerStatusResponse | null { + // If we receive a full update, replace state + if (notification.method === 'Server.OnUpdate') { + const params = notification.params as ServerOnUpdateNotificationParams; + console.log('SnapcastService: Server.OnUpdate received.'); + return params.server ? { server: params.server } : currentState; + } - // This is the logic from the old SnapcastWebsocketNotificationService, - switch (notification.method) { - case 'Client.OnVolumeChanged': - const volParams = notification.params as ClientVolumeChange; - const clientVol = server.groups.flatMap(g => g.clients).find(c => c.id === volParams.id); - if (clientVol) clientVol.config.volume = volParams.volume; + if (!currentState) return null; + + return produce(currentState, draft => { + const server = draft.server; + if (!server) return; + + switch (notification.method) { + case 'Client.OnVolumeChanged': { + const params = notification.params as ClientVolumeChange; + const client = findClient(server, params.id); + if (client) client.config.volume = params.volume; break; - case 'Client.OnConnect': - const connectParams = notification.params as ClientOnConnect; - const newClient = connectParams.client; - console.log(`SnapcastService: Client connected: ${newClient.id}`); + } + case 'Client.OnConnect': { + const params = notification.params as ClientOnConnect; + console.log(`SnapcastService: Client connected: ${params.client.id}`); + // Note: Proper handling requires inserting client into correct Group. + // Since Group ID isn't provided in ClientOnConnect, we rely on Server.GetStatus refresh + // which is triggered in the Service subscription. break; - - case 'Client.OnDisconnect': - const disconnectParams = notification.params as ClientOnDisconnect; - const disconnectedClient = server.groups.flatMap(g => g.clients).find(c => c.id === disconnectParams.id); - if (disconnectedClient) { - console.log(`SnapcastService: Client disconnected: ${disconnectedClient.id}`); - disconnectedClient.connected = false; // Mark as disconnected - } else { - console.warn(`SnapcastService: Client disconnected but not found in current state: ${disconnectParams.id}`); + } + case 'Client.OnDisconnect': { + const params = notification.params as ClientOnDisconnect; + const client = findClient(server, params.id); + if (client) { + client.connected = false; + console.log(`SnapcastService: Client disconnected: ${params.id}`); } break; - case 'Client.OnNameChanged': - const nameParams = notification.params as { id: string; name: string }; - const clientName = server.groups.flatMap(g => g.clients).find(c => c.id === nameParams.id); - if (clientName) { - clientName.config.name = nameParams.name; - console.log(`SnapcastService: Client ${clientName.id} name changed to ${nameParams.name}`); - } else { - console.warn(`SnapcastService: Client name change for unknown client ID: ${nameParams.id}`); + } + case 'Client.OnNameChanged': { + const params = notification.params as { id: string; name: string }; + const client = findClient(server, params.id); + if (client) { + client.config.name = params.name; + console.log(`SnapcastService: Client ${params.id} name -> ${params.name}`); } break; - - case 'Client.OnLatencyChanged': - const latencyParams = notification.params as { id: string; latency: number }; - const clientLatency = server.groups.flatMap(g => g.clients).find(c => c.id === latencyParams.id); - if (clientLatency) { - clientLatency.config.latency = latencyParams.latency; - console.log(`SnapcastService: Client ${clientLatency.id} latency changed to ${latencyParams.latency}`); - } else { - console.warn(`SnapcastService: Client latency change for unknown client ID: ${latencyParams.id}`); - } + } + case 'Client.OnLatencyChanged': { + const params = notification.params as { id: string; latency: number }; + const client = findClient(server, params.id); + if (client) client.config.latency = params.latency; break; - - - case 'Group.OnMute': - const gmParams = notification.params as GroupMuteChange; - const groupMute = server.groups.find(g => g.id === gmParams.id); - if (groupMute) groupMute.muted = gmParams.mute; + } + case 'Group.OnMute': { + const params = notification.params as GroupMuteChange; + const group = server.groups.find(g => g.id === params.id); + if (group) group.muted = params.mute; break; - - case 'Group.OnStreamChanged': - const gsParams = notification.params as GroupStreamChange; - const groupStream = server.groups.find(g => g.id === gsParams.id); - if (groupStream) { - groupStream.stream_id = gsParams.stream_id; - const stream = server.streams.find(s => s.id === gsParams.stream_id); - if (stream) { - console.log(`SnapcastService: Group ${groupStream.id} changed stream to ${stream.id}`); - } else { - console.warn(`SnapcastService: Stream ${gsParams.stream_id} not found for group ${groupStream.id}`); - } - } else { - console.warn(`SnapcastService: Group ${gsParams.id} not found for stream change.`); + } + case 'Group.OnStreamChanged': { + const params = notification.params as GroupStreamChange; + const group = server.groups.find(g => g.id === params.id); + if (group) { + group.stream_id = params.stream_id; + console.log(`SnapcastService: Group ${params.id} stream -> ${params.stream_id}`); } break; - - case 'Group.OnNameChanged': - const gnParams = notification.params as GroupNameChange; - const groupName = server.groups.find(g => g.id === gnParams.id); - if (groupName) { - groupName.name = gnParams.name; - console.log(`SnapcastService: Group ${groupName.id} name changed to ${gnParams.name}`); - } else { - console.warn(`SnapcastService: Group ${gnParams.id} not found for name change.`); - } + } + case 'Group.OnNameChanged': { + const params = notification.params as GroupNameChange; + const group = server.groups.find(g => g.id === params.id); + if (group) group.name = params.name; break; - - case 'Stream.OnProperties': - const streamProps = notification.params as StreamOnProperties; - const stream = server.streams.find(s => s.id === streamProps.id); + } + case 'Stream.OnProperties': { + const params = notification.params as StreamOnProperties; + const stream = server.streams.find(s => s.id === params.id); if (stream) { - console.log(`SnapcastService: Stream ${stream.id} properties updated.`); - console.log('SnapcastService: Stream properties details:', streamProps); - stream.properties = { ...stream.properties, ...streamProps.properties }; - } else { - console.warn(`SnapcastService: Stream ${streamProps.id} not found for properties update.`); - } - break; - case 'Stream.OnUpdate': - const streamUpdate = notification.params as StreamOnUpdate; - const updatedStream = server.streams.find(s => s.id === streamUpdate.id); - if (updatedStream) { - console.log(`SnapcastService: Updating stream ${updatedStream.id} with new status and properties.`); - console.log('SnapcastService: Stream update details:', streamUpdate); - updatedStream.status = streamUpdate.stream.status; - updatedStream.properties = { ...updatedStream.properties, ...streamUpdate.stream.properties }; + stream.properties = { ...stream.properties, ...params.properties }; } break; - case 'Server.OnUpdate': - const serverUpdate = notification.params as ServerOnUpdateNotificationParams; - if (serverUpdate.server) { - console.log('SnapcastService: Server.OnUpdate received, replacing current state with new server data.'); - console.log('SnapcastService: Server update details:', serverUpdate); - draft.server = serverUpdate.server; - } else { - console.warn('SnapcastService: Server.OnUpdate received but no server data found in notification.'); + } + case 'Stream.OnUpdate': { + const params = notification.params as StreamOnUpdate; + const stream = server.streams.find(s => s.id === params.id); + if (stream) { + stream.status = params.stream.status; + stream.properties = params.stream.properties; } break; + } + } + }); +} - default: - console.warn(`SnapcastService: Unhandled notification method: ${notification.method}`); - break; +function findClient(server: SnapServerModel, clientId: string): Client | undefined { + for (const group of server.groups) { + const client = group.clients.find(c => c.id === clientId); + if (client) return client; + } + return undefined; +} + + +@Injectable({ providedIn: 'root' }) +export class SnapcastService implements OnDestroy { + // --- Setup & State --- + private readonly socket: JsonRpcSocket; + private readonly _state = new BehaviorSubject(null); + + // --- Public Observables --- + public readonly state$ = this._state.asObservable(); + public readonly isConnected$: Observable; + + public readonly groups$ = this.state$.pipe( + map(s => s?.server.groups || []), + distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)) + ); + public readonly streams$ = this.state$.pipe( + map(s => s?.server.streams || []), + distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)) + ); + public readonly serverDetails$ = this.state$.pipe( + map(s => s?.server.server), + distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)) + ); + + constructor(private http: HttpClient) { + this.socket = new JsonRpcSocket(); + this.isConnected$ = this.socket.connected$.asObservable(); + + // Wire up the state reducer + this.socket.notification$.subscribe(notification => { + const currentState = this._state.value; + const newState = snapcastReducer(currentState, notification); + + if (newState !== currentState) { + this._state.next(newState); + } + + // Refresh full state on topology changes (Client connect/disconnect) + // because the notification doesn't tell us which group the client belongs to + if (notification.method === 'Client.OnConnect' || notification.method === 'Client.OnDisconnect') { + this.refreshState(); } }); + } + + async connect(host = environment.snapcastServerUrl, port = 1780, overrideUserPreference = false): Promise { + let finalHost = host; + + // Check User Preferences + if (!overrideUserPreference) { + const pref = await Preferences.get({ key: UserPreference.SERVER_URL }); + if (pref.value) finalHost = pref.value; + } - // Emit the new, immutable state. - this.stateSubject$.next(nextState); + const wsUrl = `ws://${finalHost}:${port}/jsonrpc`; + + this.socket.connect(wsUrl, () => { + // On Connect success: + this.refreshState(); + }); } - private rpc(method: string, params?: P): Observable> { - if (!this.ws$ || this.ws$.closed) return throwError(() => new Error('SnapcastService: WebSocket not connected.')); - const id = this.rpcRequestId++; - const request: JsonRpcRequest

= { jsonrpc: '2.0', id, method, ...(params && { params }) }; - this.ws$.next(request as SnapcastWebSocketMessage); - return this.rpcResponses$.pipe( - filter(response => response.id === id), - take(1), - map(response => response as JsonRpcResponse), - catchError(err => { console.error(`SnapcastService: RPC error for ${method} (id ${id})`, err); return throwError(() => new Error(`RPC for ${method} failed`)); }) - ); + public disconnect() { + this.socket.disconnect(); } - // --- "Simplified" Action Methods --- + public refreshState(): Observable { + const request$ = this.socket.request('Server.GetStatus', undefined).pipe( + tap((res) => this._state.next(res)), + shareReplay(1) + ); - // CLIENT ACTIONS + // Ensure subscription so the request is actually processed and state updated, + // even if the caller ignores the return value. + request$.subscribe({ + error: (err) => console.error('SnapcastService: Failed to fetch state', err) + }); + return request$; + } + // --- Public API (Strictly Typed) --- - public setClientVolumePercent(clientId: string, percent: number): Observable { - if (percent < 0 || percent > 100) return throwError(() => new Error('Volume percentage must be between 0 and 100.')); + public setClientVolumePercent(id: string, percent: number): Observable { + if (percent < 0 || percent > 100) return throwError(() => new Error('Volume must be 0-100')); + + // We need the current mute status to send the full Volume object + const client = this.findClientInState(id); + if (!client) return throwError(() => new Error(`Client ${id} not found locally`)); - // Get the current state ONCE to build the full RPC payload, as Snapcast API needs both percent and mute. - return this.state$.pipe( - take(1), - switchMap(currentState => { - const client = currentState?.server.groups.flatMap(g => g.clients).find(c => c.id === clientId); - if (!client) return throwError(() => new Error(`Client ${clientId} not found in current state.`)); - - const volumePayload: Volume = { - percent: percent, - muted: client.config.volume.muted // Use current mute status - }; - return this.rpc('Client.SetVolume', { id: clientId, volume: volumePayload }); - }), - map((): void => void 0), // Map successful RPC response to void - catchError(err => { - console.error(`SnapcastService: Failed to set volume for client ${clientId}`, err); - return throwError(() => err); - }) - ); + const volume: Volume = { percent, muted: client.config.volume.muted }; + return this.socket.request('Client.SetVolume', { id, volume }); } - setClientName(clientId: string, name: string): Observable { - return this.rpc('Client.SetName', { id: clientId, name }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to set name for client ${clientId}`, err); - return throwError(() => err); - }) - ); + public setClientName(id: string, name: string) { + return this.socket.request('Client.SetName', { id, name }); } - setClientLatency(clientId: string, latency: number): Observable { - if (latency < 0) return throwError(() => new Error('Latency must be a non-negative number.')); - return this.rpc('Client.SetLatency', { id: clientId, latency }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to set latency for client ${clientId}`, err); - return throwError(() => err); - }) - ); + public setClientLatency(id: string, latency: number) { + if (latency < 0) return throwError(() => new Error('Latency must be positive')); + return this.socket.request('Client.SetLatency', { id, latency }); } - // Function added just for completeness, not used in the app yet. - getClientStatus(id: string): Observable { - return this.rpc('Client.GetStatus', { id }).pipe( - map(response => response.result as Client | undefined), - catchError(err => { - console.error(`SnapcastService: Failed to get status for client ${id}`, err); - return throwError(() => err); - }) - ); + public getClientStatus(id: string) { + return this.socket.request('Client.GetStatus', { id }); } - - // GROUP ACTIONS - - - - - setGroupName(groupId: string, name: string): Observable { - return this.rpc('Group.SetName', { id: groupId, name }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to set group name for group ${groupId}`, err); - return throwError(() => err); - }) - ); + public setGroupName(id: string, name: string) { + return this.socket.request('Group.SetName', { id, name }); } - - setGroupStream(groupId: string, streamId: string): Observable { - return this.rpc('Group.SetStream', { id: groupId, stream_id: streamId }).pipe( - map( - (): void => { - console.log(`SnapcastService: Successfully set stream ${streamId} for group ${groupId}`); - this.refreshState(); // Refresh state after setting stream - } - ), - catchError(err => { - console.error(`SnapcastService: Failed to set stream for group ${groupId}`, err); - return throwError(() => err); - }) - + public setGroupStream(id: string, stream_id: string) { + return this.socket.request('Group.SetStream', { id, stream_id }).pipe( + tap(() => this.refreshState()) // Refresh to ensure Group state (streams) is consistent ); - } - setGroupClients(groupId: string, clientIds: string[]): Observable { - return this.rpc('Group.SetClients', { id: groupId, clients: clientIds }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to set clients for group ${groupId}`, err); - return throwError(() => err); - }) - ); + public setGroupClients(id: string, clients: string[]) { + return this.socket.request('Group.SetClients', { id, clients }); } - setGroupMute(groupId: string, mute: boolean): Observable { - return this.rpc('Group.SetMute', { id: groupId, mute }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to set mute for group ${groupId}`, err); - return throwError(() => err); - }) - ); + public setGroupMute(id: string, mute: boolean) { + return this.socket.request('Group.SetMute', { id, mute }); } - getGroupStatus(groupId: string): Observable { - return this.rpc('Group.GetStatus', { id: groupId }).pipe( - map(response => response.result as Group | undefined), - catchError(err => { - console.error(`SnapcastService: Failed to get status for group ${groupId}`, err); - return throwError(() => err); - }) - ); + public getGroupStatus(id: string) { + return this.socket.request('Group.GetStatus', { id }); } - - - // SERVER ACTIONS - - getServerStatus(): Observable { - return this.rpc('Server.GetStatus').pipe( - map(response => response.result), - catchError(err => { - console.error('SnapcastService: Failed to get server status', err); - return throwError(() => err); - }) - ); + + public getServerStatus() { + return this.socket.request('Server.GetStatus', undefined); } - getServerRpcVersion(): Observable { - return this.rpc('Server.GetRpcVersion').pipe( - map(response => response.result?.version), - catchError(err => { - console.error('SnapcastService: Failed to get server RPC version', err); - return throwError(() => err); - }) - ); - + public getServerRpcVersion() { + return this.socket.request('Server.GetRpcVersion', undefined); } - deleteServerClient(clientId: string): Observable { - return this.rpc('Server.DeleteClient', { id: clientId }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to delete client ${clientId}`, err); - return throwError(() => err); - }) - ); + public deleteServerClient(id: string) { + return this.socket.request('Server.DeleteClient', { id }); } - // Stream Actions - - setStreamProperty(streamId: string, property: keyof Stream, value: any): Observable { - const params: StreamSetPropertyRpcPayloadParams = { id: streamId, property, value }; - return this.rpc('Stream.SetProperty', params).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to set property ${property} for stream ${streamId}`, err); - return throwError(() => err); - }) - ); + public setStreamProperty(id: string, property: string, value: any) { + return this.socket.request('Stream.SetProperty', { id, property, value }); } - addStream(stream: Stream): Observable { - return this.rpc('Stream.Add', { stream }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to add stream`, err); - return throwError(() => err); - }) - ); + public addStream(stream: Stream) { + return this.socket.request('Stream.Add', { stream }); } - removeStream(streamId: string): Observable { - return this.rpc('Stream.Remove', { id: streamId }).pipe( - map((): void => void 0), - catchError(err => { - console.error(`SnapcastService: Failed to remove stream ${streamId}`, err); - return throwError(() => err); - }) - ); + public removeStream(id: string) { + return this.socket.request('Stream.Remove', { id }); } + // --- Helpers --- - - - - // Mock server state for testing purposes - - public mockServerState(): void { - const url = "assets/mock/json/server-state.json" - this.http.get(url).subscribe({ - next: (data) => { - console.log('Mock server state loaded:', data); - this.stateSubject$.next(data); - }, - error: (err) => { - console.error('Failed to load mock server state:', err); - } - }); + private findClientInState(id: string): Client | undefined { + const s = this._state.value; + if (!s) return undefined; + return findClient(s.server, id); } - - // TODO: ... other get methods ... - - - - // --- Lifecycle and Disconnect --- - - public refreshState() { - this.fetchInitialServerStatus(); + + public mockServerState() { + const url = "assets/mock/json/server-state.json"; + this.http.get(url).subscribe(s => this._state.next(s)); } - public disconnect(): void { - this.disconnectInternals(true); - (this.isConnected$ as BehaviorSubject).next(false); - } - private disconnectInternals(fullCleanup: boolean): void { - if (this.ws$) { this.ws$.complete(); this.ws$ = undefined; } - if (fullCleanup) { this.serviceSubscriptions.unsubscribe(); this.serviceSubscriptions = new Subscription(); } - } - ngOnDestroy(): void { + ngOnDestroy() { this.disconnect(); - this.stateSubject$.complete(); - this.rpcResponses$.complete(); - (this.isConnected$ as BehaviorSubject).complete(); + this._state.complete(); } -} \ No newline at end of file +} diff --git a/src/theme/variables.scss b/src/theme/variables.scss index 673d529..6f985b8 100644 --- a/src/theme/variables.scss +++ b/src/theme/variables.scss @@ -108,6 +108,9 @@ border-radius: 8px 8px 0 0; --border-radius: 8px 8px 0 0; } + .refresher-refreshing-text, .refresher-pulling-text { + font-size: 12px; + } } @media (prefers-color-scheme: dark) {