11import { logResponseError } from "./utils/responseError" ;
2- import {
3- BULK_QUEUE_FLUSH_DELAY_MS ,
4- BULK_QUEUE_MAX_SIZE ,
5- BULK_QUEUE_RETRY_BASE_DELAY_MS ,
6- BULK_QUEUE_RETRY_MAX_DELAY_MS ,
7- } from "./config" ;
2+ import { BULK_QUEUE_FLUSH_DELAY_MS , BULK_QUEUE_MAX_SIZE } from "./config" ;
83import { Logger } from "./logger" ;
94
10- const BULK_QUEUE_STORAGE_KEY = "__reflag_bulk_queue_v1" ;
11- const WARN_AFTER_CONSECUTIVE_FAILURES = 10 ;
12- const WARN_AFTER_FAILURE_MS = 5 * 60 * 1000 ;
13- const WARN_THROTTLE_MS = 15 * 60 * 1000 ;
145const DROP_ERROR_THROTTLE_MS = 15 * 60 * 1000 ;
156
167type PayloadContext = {
@@ -61,84 +52,19 @@ export type BulkEvent =
6152export type BulkQueueOptions = {
6253 flushDelayMs ?: number ;
6354 maxSize ?: number ;
64- retryBaseDelayMs ?: number ;
65- retryMaxDelayMs ?: number ;
66- storageKey ?: string ;
6755 logger ?: Logger ;
6856} ;
6957
70- function getSessionStorage ( ) : Storage | null {
71- try {
72- if ( typeof sessionStorage === "undefined" ) {
73- return null ;
74- }
75- return sessionStorage ;
76- } catch {
77- return null ;
78- }
79- }
80-
81- function isObject ( value : unknown ) : value is Record < string , unknown > {
82- return typeof value === "object" && value !== null ;
83- }
84-
85- function isBulkEvent ( value : unknown ) : value is BulkEvent {
86- if ( ! isObject ( value ) || typeof value . type !== "string" ) {
87- return false ;
88- }
89-
90- if ( value . type === "user" ) {
91- return typeof value . userId === "string" ;
92- }
93-
94- if ( value . type === "company" ) {
95- return typeof value . companyId === "string" ;
96- }
97-
98- if ( value . type === "event" ) {
99- return typeof value . userId === "string" && typeof value . event === "string" ;
100- }
101-
102- if ( value . type === "feature-flag-event" ) {
103- return (
104- typeof value . key === "string" &&
105- ( value . action === "check-is-enabled" || value . action === "check-config" )
106- ) ;
107- }
108-
109- if ( value . type === "prompt-event" ) {
110- return (
111- typeof value . featureId === "string" &&
112- typeof value . promptId === "string" &&
113- typeof value . userId === "string" &&
114- typeof value . promptedQuestion === "string" &&
115- ( value . action === "received" ||
116- value . action === "shown" ||
117- value . action === "dismissed" )
118- ) ;
119- }
120-
121- return false ;
122- }
123-
12458export class BulkQueue {
12559 private readonly flushDelayMs : number ;
12660 private readonly maxSize : number ;
127- private readonly retryBaseDelayMs : number ;
128- private readonly retryMaxDelayMs : number ;
129- private readonly storageKey : string ;
130- private readonly storage : Storage | null ;
13161 private readonly logger ?: Logger ;
13262 private readonly sendBulk : ( events : BulkEvent [ ] ) => Promise < Response > ;
13363
13464 private queue : BulkEvent [ ] = [ ] ;
13565 private timer : ReturnType < typeof setTimeout > | null = null ;
13666 private inFlightBatch : BulkEvent [ ] | null = null ;
137- private inFlightPromise : Promise < number | null > | null = null ;
138- private retryCount = 0 ;
139- private consecutiveFailures = 0 ;
140- private firstFailureAt : number | null = null ;
141- private lastWarnAt : number | null = null ;
67+ private inFlightPromise : Promise < void > | null = null ;
14268 private lastDropErrorAt : number | null = null ;
14369 private totalDroppedEvents = 0 ;
14470 private droppedSinceLastError = 0 ;
@@ -150,24 +76,12 @@ export class BulkQueue {
15076 this . sendBulk = sendBulk ;
15177 this . flushDelayMs = opts . flushDelayMs ?? BULK_QUEUE_FLUSH_DELAY_MS ;
15278 this . maxSize = opts . maxSize ?? BULK_QUEUE_MAX_SIZE ;
153- this . retryBaseDelayMs =
154- opts . retryBaseDelayMs ?? BULK_QUEUE_RETRY_BASE_DELAY_MS ;
155- this . retryMaxDelayMs =
156- opts . retryMaxDelayMs ?? BULK_QUEUE_RETRY_MAX_DELAY_MS ;
157- this . storageKey = opts . storageKey ?? BULK_QUEUE_STORAGE_KEY ;
158- this . storage = getSessionStorage ( ) ;
15979 this . logger = opts . logger ;
160-
161- this . restoreQueueFromStorage ( ) ;
162- if ( this . queue . length > 0 ) {
163- this . schedule ( this . flushDelayMs ) ;
164- }
16580 }
16681
16782 async enqueue ( event : BulkEvent ) {
16883 this . queue . push ( event ) ;
16984 this . trimPendingQueueToCapacity ( ) ;
170- this . persistQueueToStorage ( ) ;
17185
17286 const maxPending = Math . max ( 0 , this . maxSize - this . getInFlightBatchSize ( ) ) ;
17387 if ( this . queue . length > 0 && this . queue . length >= maxPending ) {
@@ -198,35 +112,24 @@ export class BulkQueue {
198112
199113 const sendPromise = this . sendBatch ( batch ) ;
200114 this . inFlightPromise = sendPromise ;
201- let nextDelayMs : number | null = null ;
202115 try {
203- nextDelayMs = await sendPromise ;
116+ await sendPromise ;
204117 } finally {
205118 if ( this . inFlightPromise === sendPromise ) {
206119 this . inFlightPromise = null ;
207120 }
208121 this . inFlightBatch = null ;
209- this . persistQueueToStorage ( ) ;
210122 }
211123
212- if ( this . queue . length > 0 && ! this . timer && nextDelayMs !== null ) {
213- this . schedule ( nextDelayMs ) ;
124+ if ( this . queue . length > 0 && ! this . timer ) {
125+ this . schedule ( this . flushDelayMs ) ;
214126 }
215127 }
216128
217129 async size ( ) {
218130 return this . queue . length + this . getInFlightBatchSize ( ) ;
219131 }
220132
221- private getRetryDelay ( ) {
222- const maxExponent = 6 ;
223- const exponent = Math . min ( this . retryCount - 1 , maxExponent ) ;
224- return Math . min (
225- this . retryBaseDelayMs * 2 ** exponent ,
226- this . retryMaxDelayMs ,
227- ) ;
228- }
229-
230133 private schedule ( delayMs : number ) {
231134 if ( this . timer || this . inFlightPromise || this . queue . length === 0 ) {
232135 return ;
@@ -244,130 +147,27 @@ export class BulkQueue {
244147 }
245148
246149 private async sendBatch ( batch : BulkEvent [ ] ) {
247- let nextDelayMs : number | null = null ;
248-
150+ let res : Response ;
249151 try {
250- const res = await this . sendBulk ( batch ) ;
251- if ( ! res . ok ) {
252- if ( res . status >= 400 && res . status < 500 ) {
253- this . retryCount = 0 ;
254- this . firstFailureAt = null ;
255- this . consecutiveFailures = 0 ;
256- this . lastWarnAt = null ;
257- if ( this . logger ) {
258- await logResponseError ( {
259- logger : this . logger ,
260- res,
261- message :
262- "bulk request failed with non-retriable status; dropping batch" ,
263- } ) ;
264- }
265- nextDelayMs = this . flushDelayMs ;
266- } else {
267- throw new Error ( `unexpected status ${ res . status } ` ) ;
268- }
269- } else {
270- this . retryCount = 0 ;
271- if ( this . firstFailureAt !== null && this . consecutiveFailures > 0 ) {
272- this . logger ?. info ( "bulk delivery recovered" , {
273- outageMs : Date . now ( ) - this . firstFailureAt ,
274- failedAttempts : this . consecutiveFailures ,
275- } ) ;
276- }
277- this . firstFailureAt = null ;
278- this . consecutiveFailures = 0 ;
279- this . lastWarnAt = null ;
280- nextDelayMs = this . flushDelayMs ;
281- }
152+ res = await this . sendBulk ( batch ) ;
282153 } catch ( error ) {
283- this . queue = batch . concat ( this . queue ) ;
284-
285- const now = Date . now ( ) ;
286- if ( this . firstFailureAt === null ) {
287- this . firstFailureAt = now ;
288- }
289- this . consecutiveFailures += 1 ;
290- this . retryCount += 1 ;
291- const retryInMs = this . getRetryDelay ( ) ;
292- nextDelayMs = retryInMs ;
293- this . logger ?. info ( "bulk retry scheduled" , {
294- retryInMs,
295- queueSize : this . queue . length + this . getInFlightBatchSize ( ) ,
296- consecutiveFailures : this . consecutiveFailures ,
154+ this . logger ?. error ( "bulk request failed; dropping batch" , {
155+ error,
156+ batchSize : batch . length ,
297157 } ) ;
298-
299- const outageMs = now - this . firstFailureAt ;
300- const shouldWarn =
301- this . consecutiveFailures >= WARN_AFTER_CONSECUTIVE_FAILURES ||
302- outageMs >= WARN_AFTER_FAILURE_MS ;
303- const canWarnNow =
304- this . lastWarnAt === null || now - this . lastWarnAt >= WARN_THROTTLE_MS ;
305- if ( shouldWarn && canWarnNow ) {
306- this . logger ?. warn ( "bulk delivery degraded" , {
307- consecutiveFailures : this . consecutiveFailures ,
308- outageMs,
309- queueSize : this . queue . length + this . getInFlightBatchSize ( ) ,
310- retryInMs,
311- error,
312- } ) ;
313- this . lastWarnAt = now ;
314- }
315- }
316-
317- return nextDelayMs ;
318- }
319-
320- private getPersistedQueue ( ) {
321- const inFlight = this . inFlightBatch ?? [ ] ;
322- return inFlight . concat ( this . queue ) . slice ( - this . maxSize ) ;
323- }
324-
325- private persistQueueToStorage ( ) {
326- if ( ! this . storage ) {
327158 return ;
328159 }
329160
330- try {
331- const persisted = this . getPersistedQueue ( ) ;
332- if ( persisted . length === 0 ) {
333- this . storage . removeItem ( this . storageKey ) ;
334- return ;
161+ if ( ! res . ok ) {
162+ if ( this . logger ) {
163+ await logResponseError ( {
164+ logger : this . logger ,
165+ res,
166+ message : "bulk request failed; dropping batch" ,
167+ } ) ;
335168 }
336-
337- this . storage . setItem ( this . storageKey , JSON . stringify ( persisted ) ) ;
338- } catch {
339- // ignore persistence failures
340- }
341- }
342-
343- private restoreQueueFromStorage ( ) {
344- if ( ! this . storage ) {
345169 return ;
346170 }
347-
348- try {
349- const raw = this . storage . getItem ( this . storageKey ) ;
350- if ( ! raw ) {
351- return ;
352- }
353-
354- const parsed : unknown = JSON . parse ( raw ) ;
355- if ( ! Array . isArray ( parsed ) ) {
356- throw new Error ( "invalid stored bulk queue" ) ;
357- }
358-
359- this . queue = parsed . filter ( isBulkEvent ) . slice ( - this . maxSize ) ;
360- if ( this . queue . length === 0 ) {
361- this . storage . removeItem ( this . storageKey ) ;
362- }
363- } catch {
364- this . queue = [ ] ;
365- try {
366- this . storage . removeItem ( this . storageKey ) ;
367- } catch {
368- // ignore cleanup failures
369- }
370- }
371171 }
372172
373173 private getInFlightBatchSize ( ) {
0 commit comments