You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
PodHaven/Views/Search/Models/SearchRecommendationCollector.swift ships at ~965 lines in PR #284 (it grew after this issue was filed — see the Update below). It works, is well-tested, and reflects the v1 plan in docs/initiatives/search-recommendations.md. As a follow-up, decompose it into smaller types so concerns stop interleaving.
scoring-availability gating (scoringAvailability enum, awaitScoringContext, watcher task tracking close → open transitions)
drain task + work queue (queued: OrderedSet<FeedURL> backlog, inFlight: Set<FeedURL>, queueContinuation: AsyncStream<FeedURL>.Continuation, runDrainLoop consuming the stream into a withDiscardingTaskGroup, plus the processFeedURL detach guards — isFeedURLDetached / ObjectIdentifier checks — and the mid-fetch re-schedule "tails")
Pull the nonisolated static func runPipeline(...) plus filteredCandidates and isDiscoveryCandidate into their own type (or enum SearchPipelineRunner with static methods). Inputs: DownloadTask, Podcast.ID?, ITunesPodcastID?, Episode.ID?, ContextualEmbedding, RecommendationEngine, any Databasing. Output: PipelineResult.
Rationale: the collector becomes "schedule + cache + post-process," not "schedule + cache + fetch + parse + embed + score." The runner is already nonisolated static and pure-ish; lifting it is a near-mechanical move. Tests can target the runner directly with fake DownloadTask + fake embedding + fake engine for narrower unit coverage.
Constraints:
Keep scoreFloor, episodesPerPodcast accessible to the runner (either as runner-owned constants or threaded through).
Self.log reference in the embedding-failure catch needs to follow.
The runner now also takes the isDetached: @Sendable () async -> Bool closure threaded through runPipeline (added with the drain rewrite) — carry it.
Proposal B — extract ScoringReadinessGate
Pull scoringAvailability: ScoringAvailability (the .unknown / .ready / .unavailable enum), awaitScoringContext(), ensureScoringContextWatcherRunning(), and handleScoringContextBecameAvailable() into a ScoringReadinessGate type. It owns the engine subscription, the watcher task, and the close→open transition detection.
API sketch:
@MainActorfinalclassScoringReadinessGate{enumState{case unknown, ready, unavailable }private(set)varstate:Statefunc awaitReady()async // returns once .ready or .unavailable is determined
func startWatching(onTransitionToReady:@escaping@MainActor()->Void)func reset()}
Rationale: removes ~80 lines from the collector and makes the gating story self-contained. The current handleScoringContextBecameAvailable reaches into the collector's permanent/temporary arrays to re-queue entries; after extraction, the gate emits a callback and the collector decides what to re-queue. Easier to test the close→open detection logic in isolation (the existing test removedPickSurvivesScoringContextCycle would still pin the end-to-end behavior).
Interaction with the scoring short-circuit (the old "Finding 2," now landed in PR #284 — commit 20dd6d46): contrary to the original note here, the drain does not early-return on state == .unavailable. awaitScoringContext keeps blocking on $scoringRevision.stream() while unavailable, and a separate lifetime watcher (ensureScoringContextWatcherRunning, with dropFirst()) tracks close→open and re-queues empty-.scored entries. So there are now two concurrent subscriptions to $scoringRevision.stream() doing related-but-distinct work. That is a strong argument for this extraction: a ScoringReadinessGate should own a single subscription and serve both awaitReady() and onTransitionToReady off it, collapsing the duplication.
Proposal C — extract PickIndex
Pull pickIndex: [MediaGUID: CachedPodcastEntry], registerPicks(_:for:), unregisterPicks(of:), and removePick(mediaGUID:) into a PickIndex type owned by the collector.
API sketch:
@MainActorfinalclassPickIndex{func register(_ scored:[ScoredEpisode], for entry:CachedPodcastEntry)func unregister(of entry:CachedPodcastEntry)
// Returns the entry whose picks held the GUID, or nil. Caller handles
// entry-level cleanup (status flip to .exhausted).
func remove(mediaGUID:MediaGUID)->CachedPodcastEntry?}
Rationale: the current invariant — "each scored episode lives in exactly one entry, so picks added here can never collide with another entry's index. Re-scoring an already-scored entry would, so unregister the old picks first" — is non-trivial and easy to skip. A dedicated type forces register to call unregister internally, and gives a place to assert/document the invariant. The .exhausted flip logic stays on the collector since it touches entry.status.
Tests can verify the register-overwrites-unregister semantics directly without spinning up the full pipeline.
Suggested order
Proposal A (mechanical, smallest blast radius)
Proposal C (small, makes invariant explicit)
Proposal B (touches the most interesting concurrency; now also collapses the two $scoringRevision subscriptions — do last so the short-circuit is in, which it now is)
Each can ship as its own PR. All three together should drop the collector from ~965 to ~500 lines while keeping the public surface (setActiveSource, recordSourcePodcasts, reset, removePick, picks(for:), bannerState(for:), bannerState, activeSource) unchanged. (visiblePicks is being deleted in PR #284 as a test-only surface, so the decomposition no longer needs to preserve it.)
A second review of PR #284 (now at HEAD 51e6b7d4, after the original review that filed this) surfaced changes and insights relevant to the decomposition:
The drain layer was rewritten after this issue was filed.pendingDrainQueue + drainContinuation + waitForWork/dequeueNext + the manual bounded task group are gone (commits 0c48446b, 16efe8c5). The current shape is queued (backlog dedup) + inFlight + an AsyncStream<FeedURL> (queueContinuation) consumed by runDrainLoop into a withDiscardingTaskGroup. The "combines" list and Proposal A constraints above are updated to match. The biggest new complexity source is in processFeedURL: the isFeedURLDetached / ObjectIdentifier detach guards and the two mid-fetch re-schedule "tails" that re-queue an entry a purge swapped in. Any drain extraction must carry these.
The scoring short-circuit (old "Finding 2") has landed (20dd6d46), but not as the original Proposal B note assumed — see the corrected interaction note in Proposal B. Net: two concurrent $scoringRevision.stream() subscriptions now exist, which makes Proposal B more valuable.
visiblePicks is being deleted in PR Search-tab top-picks discovery list #284 as a test-only surface (no production caller). Removed from the "unchanged public surface" list.
isDiscoveryCandidate stays aligned with canonical Episode.candidate (correction). An earlier note here said Search-tab top-picks discovery list #284 adds a cache check to the gate; that was reverted. Cached episodes remain discovery candidates (they still show in topRecommendations/UpNext). Search-tab top-picks discovery list #284 instead makes caching/save-in-cache non-consuming by dropping their didPerformAction calls in ManagingEpisodes, so Proposal A lifts isDiscoveryCandidate unchanged.
Context
PodHaven/Views/Search/Models/SearchRecommendationCollector.swiftships at ~965 lines in PR #284 (it grew after this issue was filed — see the Update below). It works, is well-tested, and reflects the v1 plan indocs/initiatives/search-recommendations.md. As a follow-up, decompose it into smaller types so concerns stop interleaving.The collector currently combines:
permanentIdentifiedArrayOf<CachedPodcastEntry>, per-trending source index)pickIndex: [MediaGUID: CachedPodcastEntry], register/unregister/removePick)scoringAvailabilityenum,awaitScoringContext, watcher task tracking close → open transitions)queued: OrderedSet<FeedURL>backlog,inFlight: Set<FeedURL>,queueContinuation: AsyncStream<FeedURL>.Continuation,runDrainLoopconsuming the stream into awithDiscardingTaskGroup, plus theprocessFeedURLdetach guards —isFeedURLDetached/ObjectIdentifierchecks — and the mid-fetch re-schedule "tails")runPipelinestatic,filteredCandidates,isDiscoveryCandidate)Proposal A — extract
SearchPipelineRunnerPull the
nonisolated static func runPipeline(...)plusfilteredCandidatesandisDiscoveryCandidateinto their own type (orenum SearchPipelineRunnerwith static methods). Inputs:DownloadTask,Podcast.ID?,ITunesPodcastID?,Episode.ID?,ContextualEmbedding,RecommendationEngine,any Databasing. Output:PipelineResult.Rationale: the collector becomes "schedule + cache + post-process," not "schedule + cache + fetch + parse + embed + score." The runner is already
nonisolated staticand pure-ish; lifting it is a near-mechanical move. Tests can target the runner directly with fake DownloadTask + fake embedding + fake engine for narrower unit coverage.Constraints:
scoreFloor,episodesPerPodcastaccessible to the runner (either as runner-owned constants or threaded through).Self.logreference in the embedding-failure catch needs to follow.isDetached: @Sendable () async -> Boolclosure threaded throughrunPipeline(added with the drain rewrite) — carry it.Proposal B — extract
ScoringReadinessGatePull
scoringAvailability: ScoringAvailability(the.unknown/.ready/.unavailableenum),awaitScoringContext(),ensureScoringContextWatcherRunning(), andhandleScoringContextBecameAvailable()into aScoringReadinessGatetype. It owns the engine subscription, the watcher task, and the close→open transition detection.API sketch:
Rationale: removes ~80 lines from the collector and makes the gating story self-contained. The current
handleScoringContextBecameAvailablereaches into the collector'spermanent/temporaryarrays to re-queue entries; after extraction, the gate emits a callback and the collector decides what to re-queue. Easier to test the close→open detection logic in isolation (the existing testremovedPickSurvivesScoringContextCyclewould still pin the end-to-end behavior).Interaction with the scoring short-circuit (the old "Finding 2," now landed in PR #284 — commit
20dd6d46): contrary to the original note here, the drain does not early-return onstate == .unavailable.awaitScoringContextkeeps blocking on$scoringRevision.stream()while unavailable, and a separate lifetime watcher (ensureScoringContextWatcherRunning, withdropFirst()) tracks close→open and re-queues empty-.scoredentries. So there are now two concurrent subscriptions to$scoringRevision.stream()doing related-but-distinct work. That is a strong argument for this extraction: aScoringReadinessGateshould own a single subscription and serve bothawaitReady()andonTransitionToReadyoff it, collapsing the duplication.Proposal C — extract
PickIndexPull
pickIndex: [MediaGUID: CachedPodcastEntry],registerPicks(_:for:),unregisterPicks(of:), andremovePick(mediaGUID:)into aPickIndextype owned by the collector.API sketch:
Rationale: the current invariant — "each scored episode lives in exactly one entry, so picks added here can never collide with another entry's index. Re-scoring an already-scored entry would, so unregister the old picks first" — is non-trivial and easy to skip. A dedicated type forces
registerto callunregisterinternally, and gives a place to assert/document the invariant. The.exhaustedflip logic stays on the collector since it touchesentry.status.Tests can verify the register-overwrites-unregister semantics directly without spinning up the full pipeline.
Suggested order
$scoringRevisionsubscriptions — do last so the short-circuit is in, which it now is)Each can ship as its own PR. All three together should drop the collector from ~965 to ~500 lines while keeping the public surface (
setActiveSource,recordSourcePodcasts,reset,removePick,picks(for:),bannerState(for:),bannerState,activeSource) unchanged. (visiblePicksis being deleted in PR #284 as a test-only surface, so the decomposition no longer needs to preserve it.)Originated from code review on PR #284.
Update (2026-05-30, re-review of PR #284)
A second review of PR #284 (now at HEAD
51e6b7d4, after the original review that filed this) surfaced changes and insights relevant to the decomposition:The drain layer was rewritten after this issue was filed.
pendingDrainQueue+drainContinuation+waitForWork/dequeueNext+ the manual bounded task group are gone (commits0c48446b,16efe8c5). The current shape isqueued(backlog dedup) +inFlight+ anAsyncStream<FeedURL>(queueContinuation) consumed byrunDrainLoopinto awithDiscardingTaskGroup. The "combines" list and Proposal A constraints above are updated to match. The biggest new complexity source is inprocessFeedURL: theisFeedURLDetached/ObjectIdentifierdetach guards and the two mid-fetch re-schedule "tails" that re-queue an entry a purge swapped in. Any drain extraction must carry these.The scoring short-circuit (old "Finding 2") has landed (
20dd6d46), but not as the original Proposal B note assumed — see the corrected interaction note in Proposal B. Net: two concurrent$scoringRevision.stream()subscriptions now exist, which makes Proposal B more valuable.visiblePicksis being deleted in PR Search-tab top-picks discovery list #284 as a test-only surface (no production caller). Removed from the "unchanged public surface" list.isDiscoveryCandidatestays aligned with canonicalEpisode.candidate(correction). An earlier note here said Search-tab top-picks discovery list #284 adds a cache check to the gate; that was reverted. Cached episodes remain discovery candidates (they still show in topRecommendations/UpNext). Search-tab top-picks discovery list #284 instead makes caching/save-in-cache non-consuming by dropping theirdidPerformActioncalls inManagingEpisodes, so Proposal A liftsisDiscoveryCandidateunchanged.