Skip to content

Decompose SearchRecommendationCollector into smaller types #356

@jubishop

Description

@jubishop

Context

PodHaven/Views/Search/Models/SearchRecommendationCollector.swift ships at ~965 lines in PR #284 (it grew after this issue was filed — see the Update below). It works, is well-tested, and reflects the v1 plan in docs/initiatives/search-recommendations.md. As a follow-up, decompose it into smaller types so concerns stop interleaving.

The collector currently combines:

  • typed-search overlay lifecycle (overlay storage, query-generation guard, prune/clear)
  • trending cache (shared permanent IdentifiedArrayOf<CachedPodcastEntry>, per-trending source index)
  • pick-index bookkeeping (pickIndex: [MediaGUID: CachedPodcastEntry], register/unregister/removePick)
  • scoring-availability gating (scoringAvailability enum, awaitScoringContext, watcher task tracking close → open transitions)
  • drain task + work queue (queued: OrderedSet<FeedURL> backlog, inFlight: Set<FeedURL>, queueContinuation: AsyncStream<FeedURL>.Continuation, runDrainLoop consuming the stream into a withDiscardingTaskGroup, plus the processFeedURL detach guards — isFeedURLDetached / ObjectIdentifier checks — and the mid-fetch re-schedule "tails")
  • per-podcast pipeline (runPipeline static, filteredCandidates, isDiscoveryCandidate)
  • lifecycle / reset

Proposal A — extract SearchPipelineRunner

Pull the nonisolated static func runPipeline(...) plus filteredCandidates and isDiscoveryCandidate into their own type (or enum SearchPipelineRunner with static methods). Inputs: DownloadTask, Podcast.ID?, ITunesPodcastID?, Episode.ID?, ContextualEmbedding, RecommendationEngine, any Databasing. Output: PipelineResult.

Rationale: the collector becomes "schedule + cache + post-process," not "schedule + cache + fetch + parse + embed + score." The runner is already nonisolated static and pure-ish; lifting it is a near-mechanical move. Tests can target the runner directly with fake DownloadTask + fake embedding + fake engine for narrower unit coverage.

Constraints:

  • Keep scoreFloor, episodesPerPodcast accessible to the runner (either as runner-owned constants or threaded through).
  • Self.log reference in the embedding-failure catch needs to follow.
  • The runner now also takes the isDetached: @Sendable () async -> Bool closure threaded through runPipeline (added with the drain rewrite) — carry it.

Proposal B — extract ScoringReadinessGate

Pull scoringAvailability: ScoringAvailability (the .unknown / .ready / .unavailable enum), awaitScoringContext(), ensureScoringContextWatcherRunning(), and handleScoringContextBecameAvailable() into a ScoringReadinessGate type. It owns the engine subscription, the watcher task, and the close→open transition detection.

API sketch:

@MainActor final class ScoringReadinessGate {
    enum State { case unknown, ready, unavailable }
    private(set) var state: State
    func awaitReady() async  // returns once .ready or .unavailable is determined
    func startWatching(onTransitionToReady: @escaping @MainActor () -> Void)
    func reset()
}

Rationale: removes ~80 lines from the collector and makes the gating story self-contained. The current handleScoringContextBecameAvailable reaches into the collector's permanent/temporary arrays to re-queue entries; after extraction, the gate emits a callback and the collector decides what to re-queue. Easier to test the close→open detection logic in isolation (the existing test removedPickSurvivesScoringContextCycle would still pin the end-to-end behavior).

Interaction with the scoring short-circuit (the old "Finding 2," now landed in PR #284 — commit 20dd6d46): contrary to the original note here, the drain does not early-return on state == .unavailable. awaitScoringContext keeps blocking on $scoringRevision.stream() while unavailable, and a separate lifetime watcher (ensureScoringContextWatcherRunning, with dropFirst()) tracks close→open and re-queues empty-.scored entries. So there are now two concurrent subscriptions to $scoringRevision.stream() doing related-but-distinct work. That is a strong argument for this extraction: a ScoringReadinessGate should own a single subscription and serve both awaitReady() and onTransitionToReady off it, collapsing the duplication.

Proposal C — extract PickIndex

Pull pickIndex: [MediaGUID: CachedPodcastEntry], registerPicks(_:for:), unregisterPicks(of:), and removePick(mediaGUID:) into a PickIndex type owned by the collector.

API sketch:

@MainActor final class PickIndex {
    func register(_ scored: [ScoredEpisode], for entry: CachedPodcastEntry)
    func unregister(of entry: CachedPodcastEntry)
    // Returns the entry whose picks held the GUID, or nil. Caller handles
    // entry-level cleanup (status flip to .exhausted).
    func remove(mediaGUID: MediaGUID) -> CachedPodcastEntry?
}

Rationale: the current invariant — "each scored episode lives in exactly one entry, so picks added here can never collide with another entry's index. Re-scoring an already-scored entry would, so unregister the old picks first" — is non-trivial and easy to skip. A dedicated type forces register to call unregister internally, and gives a place to assert/document the invariant. The .exhausted flip logic stays on the collector since it touches entry.status.

Tests can verify the register-overwrites-unregister semantics directly without spinning up the full pipeline.

Suggested order

  1. Proposal A (mechanical, smallest blast radius)
  2. Proposal C (small, makes invariant explicit)
  3. Proposal B (touches the most interesting concurrency; now also collapses the two $scoringRevision subscriptions — do last so the short-circuit is in, which it now is)

Each can ship as its own PR. All three together should drop the collector from ~965 to ~500 lines while keeping the public surface (setActiveSource, recordSourcePodcasts, reset, removePick, picks(for:), bannerState(for:), bannerState, activeSource) unchanged. (visiblePicks is being deleted in PR #284 as a test-only surface, so the decomposition no longer needs to preserve it.)

Originated from code review on PR #284.

Update (2026-05-30, re-review of PR #284)

A second review of PR #284 (now at HEAD 51e6b7d4, after the original review that filed this) surfaced changes and insights relevant to the decomposition:

  • The drain layer was rewritten after this issue was filed. pendingDrainQueue + drainContinuation + waitForWork/dequeueNext + the manual bounded task group are gone (commits 0c48446b, 16efe8c5). The current shape is queued (backlog dedup) + inFlight + an AsyncStream<FeedURL> (queueContinuation) consumed by runDrainLoop into a withDiscardingTaskGroup. The "combines" list and Proposal A constraints above are updated to match. The biggest new complexity source is in processFeedURL: the isFeedURLDetached / ObjectIdentifier detach guards and the two mid-fetch re-schedule "tails" that re-queue an entry a purge swapped in. Any drain extraction must carry these.

  • The scoring short-circuit (old "Finding 2") has landed (20dd6d46), but not as the original Proposal B note assumed — see the corrected interaction note in Proposal B. Net: two concurrent $scoringRevision.stream() subscriptions now exist, which makes Proposal B more valuable.

  • visiblePicks is being deleted in PR Search-tab top-picks discovery list #284 as a test-only surface (no production caller). Removed from the "unchanged public surface" list.

  • isDiscoveryCandidate stays aligned with canonical Episode.candidate (correction). An earlier note here said Search-tab top-picks discovery list #284 adds a cache check to the gate; that was reverted. Cached episodes remain discovery candidates (they still show in topRecommendations/UpNext). Search-tab top-picks discovery list #284 instead makes caching/save-in-cache non-consuming by dropping their didPerformAction calls in ManagingEpisodes, so Proposal A lifts isDiscoveryCandidate unchanged.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions