From 68765aa8e3acfa38e64f230c546c92cb47ab7bbc Mon Sep 17 00:00:00 2001 From: 0x00 Date: Fri, 6 Mar 2026 17:27:36 +0800 Subject: [PATCH] Fix pool addition race and baseline raw parse panic --- core/baseline/baseline.go | 11 ++--------- core/pool/brutepool.go | 5 +++-- core/pool/pool.go | 32 ++++++++++++++++++++++++++------ 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/core/baseline/baseline.go b/core/baseline/baseline.go index eab3710..0eef220 100644 --- a/core/baseline/baseline.go +++ b/core/baseline/baseline.go @@ -51,17 +51,10 @@ func NewBaseline(u, host string, resp *ihttp.Response) *Baseline { } bl.Raw = append(bl.Header, bl.Body...) - bl.Response, err = pkg.ParseRawResponse(bl.Raw) - if err != nil { - bl.IsValid = false - bl.Reason = pkg.ErrResponseError.Error() - bl.ErrString = err.Error() - return bl - } - if r := bl.Response.Header.Get("Location"); r != "" { + if r := resp.GetHeader("Location"); r != "" { bl.RedirectURL = r } else { - bl.RedirectURL = bl.Response.Header.Get("location") + bl.RedirectURL = resp.GetHeader("location") } bl.Dir = bl.IsDir() diff --git a/core/pool/brutepool.go b/core/pool/brutepool.go index 5c1d955..5652572 100644 --- a/core/pool/brutepool.go +++ b/core/pool/brutepool.go @@ -678,8 +678,9 @@ func (pool *BrutePool) Close() { // 等待缓存的待处理任务完成 time.Sleep(time.Duration(100) * time.Millisecond) } - close(pool.additionCh) // 关闭addition管道 - //close(pool.checkCh) // 关闭check管道 + pool.additionClosed.Store(true) + // additionCh may still have async producers (redirect/crawl/retry/append); + // rely on closeCh/ctx to stop the consumer loop instead of closing the channel. pool.Statistor.EndTime = time.Now().Unix() pool.reqPool.Release() pool.scopePool.Release() diff --git a/core/pool/pool.go b/core/pool/pool.go index baab583..64b2bb4 100644 --- a/core/pool/pool.go +++ b/core/pool/pool.go @@ -21,12 +21,13 @@ type BasePool struct { ctx context.Context processCh chan *baseline.Baseline // 待处理的baseline - reqCount int - failedCount int - additionCh chan *Unit - closeCh chan struct{} - wg *sync.WaitGroup - isFallback atomic.Bool + reqCount int + failedCount int + additionCh chan *Unit + closeCh chan struct{} + wg *sync.WaitGroup + isFallback atomic.Bool + additionClosed atomic.Bool } func (pool *BasePool) doRetry(bl *baseline.Baseline) { @@ -44,12 +45,31 @@ func (pool *BasePool) doRetry(bl *baseline.Baseline) { } func (pool *BasePool) addAddition(u *Unit) { + if pool.ctx.Err() != nil || pool.additionClosed.Load() { + return + } + pool.wg.Add(1) + defer func() { + if recover() != nil { + pool.wg.Done() + } + }() + select { + case <-pool.ctx.Done(): + pool.wg.Done() + return case pool.additionCh <- u: + return default: // 强行屏蔽报错, 防止goroutine泄露 go func() { + defer func() { + if recover() != nil { + pool.wg.Done() + } + }() select { case pool.additionCh <- u: case <-pool.ctx.Done():