diff --git a/CHANGELOG.md b/CHANGELOG.md index e5dda59e4..4130a0bf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ The following emojis are used to highlight certain changes: ### Changed +- `provider`: legacy re/provider uses file-backed queue (cascadeq) instead of datastore-backed queue (go-dsqueue). This ensures, regardless of datastore implementation, that ordered retrieval of queued items is efficient and does not require reading all items into memory to sort them, which is possible if the datastore implementation does not provide efficient native ordered queries. + ### Removed ### Fixed diff --git a/examples/go.mod b/examples/go.mod index a15b43271..61eb799ce 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -39,8 +39,10 @@ require ( github.com/filecoin-project/go-clock v0.1.0 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/gabriel-vasile/mimetype v1.4.13 // indirect + github.com/gammazero/cascadeq v0.2.0 // indirect github.com/gammazero/chanqueue v1.1.2 // indirect github.com/gammazero/deque v1.2.1 // indirect + github.com/gammazero/fsutil v0.2.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/gopacket v1.1.19 // indirect @@ -53,7 +55,6 @@ require ( github.com/ipfs/bbloom v0.1.0 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-cidutil v0.1.1 // indirect - github.com/ipfs/go-dsqueue v0.2.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.4 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.2 // indirect github.com/ipfs/go-ipld-cbor v0.2.1 // indirect @@ -74,7 +75,7 @@ require ( github.com/libp2p/go-doh-resolver v0.5.0 // indirect github.com/libp2p/go-flow-metrics v0.3.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect - github.com/libp2p/go-libp2p-kad-dht v0.40.0 // indirect + github.com/libp2p/go-libp2p-kad-dht v0.41.1-0.20260703233757-7e03fc4a9736 // indirect github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect github.com/libp2p/go-libp2p-record v0.3.1 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e // indirect @@ -123,7 +124,7 @@ require ( github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/quic-go/qpack v0.6.0 // indirect - github.com/quic-go/quic-go v0.59.0 // indirect + github.com/quic-go/quic-go v0.59.1 // indirect github.com/quic-go/webtransport-go v0.10.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb // indirect diff --git a/examples/go.sum b/examples/go.sum index 66e15e480..57d8d8073 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -52,10 +52,14 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= +github.com/gammazero/cascadeq v0.2.0 h1:LHq6hWLQvQCHVtCl5hDWYSavqXRFCKZ8upCdRtsPpfo= +github.com/gammazero/cascadeq v0.2.0/go.mod h1:aEkjsO3wVhT09lheCjiKbEmT7GtGa03WXGmclOZp5wY= github.com/gammazero/chanqueue v1.1.2 h1:dZEsxlyANZMyeTRemABqZF8QM9BnE4NBI43Oh3y5fIU= github.com/gammazero/chanqueue v1.1.2/go.mod h1:XDN1X/jjAbmSceNFOQbtKToeSkxtdVdpKu90LiEdBEE= github.com/gammazero/deque v1.2.1 h1:9fnQVFCCZ9/NOc7ccTNqzoKd1tCWOqeI05/lPqFPMGQ= github.com/gammazero/deque v1.2.1/go.mod h1:5nSFkzVm+afG9+gy0VIowlqVAW4N8zNcMne+CMQVD2g= +github.com/gammazero/fsutil v0.2.0 h1:/MqQHCBoT07KGY75avKqG/SI0zS693zr1Ljx8cwKWhs= +github.com/gammazero/fsutil v0.2.0/go.mod h1:UhNgS1Hr75DBX6zqBEOB4AAZQiNytnr3Mc0ZpiLKPz4= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -76,8 +80,6 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e h1:4bw4WeyTYPp0smaXiJZCNnLrvVBqirQVreixayXezGc= -github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -119,10 +121,6 @@ github.com/ipfs/go-datastore v0.9.1 h1:67Po2epre/o0UxrmkzdS9ZTe2GFGODgTd2odx8Wh6 github.com/ipfs/go-datastore v0.9.1/go.mod h1:zi07Nvrpq1bQwSkEnx3bfjz+SQZbdbWyCNvyxMh9pN0= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= -github.com/ipfs/go-ds-leveldb v0.5.2 h1:6nmxlQ2zbp4LCNdJVsmHfs9GP0eylfBNxpmY1csp0x0= -github.com/ipfs/go-ds-leveldb v0.5.2/go.mod h1:2fAwmcvD3WoRT72PzEekHBkQmBDhc39DJGoREiuGmYo= -github.com/ipfs/go-dsqueue v0.2.0 h1:MBi9w3oSiX98Xc+Y7NuJ9G8MI6mAT4IGdO9dHEMCZzU= -github.com/ipfs/go-dsqueue v0.2.0/go.mod h1:8FfNQC4DMF/KkzBXRNB9Rb3MKDW0Sh98HMtXYl1mLQE= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-pq v0.0.4 h1:U7jjENWJd1jhcrR8X/xHTaph14PTAK9O+yaLJbjqgOw= @@ -186,8 +184,8 @@ github.com/libp2p/go-libp2p v0.48.0 h1:h2BrLAgrj7X8bEN05K7qmrjpNHYA+6tnsGRdprjTn github.com/libp2p/go-libp2p v0.48.0/go.mod h1:Q1fBZNdmC2Hf82husCTfkKJVfHm2we5zk+NWmOGEmWk= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= -github.com/libp2p/go-libp2p-kad-dht v0.40.0 h1:as8U7Y1RX9CTKCBiFBHWKZ6tSS+rE+6WNz+H1+M+wbo= -github.com/libp2p/go-libp2p-kad-dht v0.40.0/go.mod h1:iLUjII47u3/HjxyhucI2lhsl29lrzlAs/ym16+H40jE= +github.com/libp2p/go-libp2p-kad-dht v0.41.1-0.20260703233757-7e03fc4a9736 h1:C721Ew/Xbjbw1Gk2kPExYP+cFG4CvVRGkW6P/H3Gc2Q= +github.com/libp2p/go-libp2p-kad-dht v0.41.1-0.20260703233757-7e03fc4a9736/go.mod h1:ViYBY1c0mezdQ9T5LQLNRNZZ2mx82xdH6s57487E/e8= github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s= github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg= @@ -302,8 +300,8 @@ github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEy github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8= github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII= -github.com/quic-go/quic-go v0.59.0 h1:OLJkp1Mlm/aS7dpKgTc6cnpynnD2Xg7C1pwL6vy/SAw= -github.com/quic-go/quic-go v0.59.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= +github.com/quic-go/quic-go v0.59.1 h1:0Gmua0HW1Tv7ANR7hUYwRyD0MG5OJfgvYSZasGZzBic= +github.com/quic-go/quic-go v0.59.1/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= github.com/quic-go/webtransport-go v0.10.0 h1:LqXXPOXuETY5Xe8ITdGisBzTYmUOy5eSj+9n4hLTjHI= github.com/quic-go/webtransport-go v0.10.0/go.mod h1:LeGIXr5BQKE3UsynwVBeQrU1TPrbh73MGoC6jd+V7ow= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= @@ -329,8 +327,6 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= -github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb h1:Ywfo8sUltxogBpFuMOFRrrSifO788kAFxmvVw31PtQQ= github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb/go.mod h1:ikPs9bRWicNw3S7XpJ8sK/smGwU9WcSVU3dy9qahYBM= github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= diff --git a/go.mod b/go.mod index 6ce0e17f2..ab8e4a24e 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/dustin/go-humanize v1.0.1 github.com/gabriel-vasile/mimetype v1.4.13 + github.com/gammazero/cascadeq v0.2.0 github.com/gammazero/chanqueue v1.1.2 github.com/gammazero/deque v1.2.1 github.com/google/uuid v1.6.0 @@ -21,7 +22,6 @@ require ( github.com/ipfs/go-cidutil v0.1.1 github.com/ipfs/go-datastore v0.9.1 github.com/ipfs/go-detect-race v0.0.1 - github.com/ipfs/go-dsqueue v0.2.0 github.com/ipfs/go-ipfs-delay v0.0.1 github.com/ipfs/go-ipfs-redirects-file v0.1.2 github.com/ipfs/go-ipld-format v0.6.3 @@ -37,7 +37,7 @@ require ( github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-doh-resolver v0.5.0 github.com/libp2p/go-libp2p v0.48.0 - github.com/libp2p/go-libp2p-kad-dht v0.40.0 + github.com/libp2p/go-libp2p-kad-dht v0.41.1-0.20260703233757-7e03fc4a9736 github.com/libp2p/go-libp2p-record v0.3.1 github.com/libp2p/go-libp2p-routing-helpers v0.7.5 github.com/libp2p/go-libp2p-testing v0.12.0 @@ -89,6 +89,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/filecoin-project/go-clock v0.1.0 // indirect github.com/flynn/noise v1.1.0 // indirect + github.com/gammazero/fsutil v0.2.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/gopacket v1.1.19 // indirect @@ -141,7 +142,7 @@ require ( github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/quic-go/qpack v0.6.0 // indirect - github.com/quic-go/quic-go v0.59.0 // indirect + github.com/quic-go/quic-go v0.59.1 // indirect github.com/quic-go/webtransport-go v0.10.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb // indirect diff --git a/go.sum b/go.sum index ef46d7e4c..876281e62 100644 --- a/go.sum +++ b/go.sum @@ -52,10 +52,14 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= +github.com/gammazero/cascadeq v0.2.0 h1:LHq6hWLQvQCHVtCl5hDWYSavqXRFCKZ8upCdRtsPpfo= +github.com/gammazero/cascadeq v0.2.0/go.mod h1:aEkjsO3wVhT09lheCjiKbEmT7GtGa03WXGmclOZp5wY= github.com/gammazero/chanqueue v1.1.2 h1:dZEsxlyANZMyeTRemABqZF8QM9BnE4NBI43Oh3y5fIU= github.com/gammazero/chanqueue v1.1.2/go.mod h1:XDN1X/jjAbmSceNFOQbtKToeSkxtdVdpKu90LiEdBEE= github.com/gammazero/deque v1.2.1 h1:9fnQVFCCZ9/NOc7ccTNqzoKd1tCWOqeI05/lPqFPMGQ= github.com/gammazero/deque v1.2.1/go.mod h1:5nSFkzVm+afG9+gy0VIowlqVAW4N8zNcMne+CMQVD2g= +github.com/gammazero/fsutil v0.2.0 h1:/MqQHCBoT07KGY75avKqG/SI0zS693zr1Ljx8cwKWhs= +github.com/gammazero/fsutil v0.2.0/go.mod h1:UhNgS1Hr75DBX6zqBEOB4AAZQiNytnr3Mc0ZpiLKPz4= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -77,8 +81,6 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e h1:4bw4WeyTYPp0smaXiJZCNnLrvVBqirQVreixayXezGc= -github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -120,10 +122,6 @@ github.com/ipfs/go-datastore v0.9.1 h1:67Po2epre/o0UxrmkzdS9ZTe2GFGODgTd2odx8Wh6 github.com/ipfs/go-datastore v0.9.1/go.mod h1:zi07Nvrpq1bQwSkEnx3bfjz+SQZbdbWyCNvyxMh9pN0= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= -github.com/ipfs/go-ds-leveldb v0.5.2 h1:6nmxlQ2zbp4LCNdJVsmHfs9GP0eylfBNxpmY1csp0x0= -github.com/ipfs/go-ds-leveldb v0.5.2/go.mod h1:2fAwmcvD3WoRT72PzEekHBkQmBDhc39DJGoREiuGmYo= -github.com/ipfs/go-dsqueue v0.2.0 h1:MBi9w3oSiX98Xc+Y7NuJ9G8MI6mAT4IGdO9dHEMCZzU= -github.com/ipfs/go-dsqueue v0.2.0/go.mod h1:8FfNQC4DMF/KkzBXRNB9Rb3MKDW0Sh98HMtXYl1mLQE= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-pq v0.0.4 h1:U7jjENWJd1jhcrR8X/xHTaph14PTAK9O+yaLJbjqgOw= @@ -183,8 +181,8 @@ github.com/libp2p/go-libp2p v0.48.0 h1:h2BrLAgrj7X8bEN05K7qmrjpNHYA+6tnsGRdprjTn github.com/libp2p/go-libp2p v0.48.0/go.mod h1:Q1fBZNdmC2Hf82husCTfkKJVfHm2we5zk+NWmOGEmWk= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= -github.com/libp2p/go-libp2p-kad-dht v0.40.0 h1:as8U7Y1RX9CTKCBiFBHWKZ6tSS+rE+6WNz+H1+M+wbo= -github.com/libp2p/go-libp2p-kad-dht v0.40.0/go.mod h1:iLUjII47u3/HjxyhucI2lhsl29lrzlAs/ym16+H40jE= +github.com/libp2p/go-libp2p-kad-dht v0.41.1-0.20260703233757-7e03fc4a9736 h1:C721Ew/Xbjbw1Gk2kPExYP+cFG4CvVRGkW6P/H3Gc2Q= +github.com/libp2p/go-libp2p-kad-dht v0.41.1-0.20260703233757-7e03fc4a9736/go.mod h1:ViYBY1c0mezdQ9T5LQLNRNZZ2mx82xdH6s57487E/e8= github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s= github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg= @@ -299,8 +297,8 @@ github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEy github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8= github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII= -github.com/quic-go/quic-go v0.59.0 h1:OLJkp1Mlm/aS7dpKgTc6cnpynnD2Xg7C1pwL6vy/SAw= -github.com/quic-go/quic-go v0.59.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= +github.com/quic-go/quic-go v0.59.1 h1:0Gmua0HW1Tv7ANR7hUYwRyD0MG5OJfgvYSZasGZzBic= +github.com/quic-go/quic-go v0.59.1/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= github.com/quic-go/webtransport-go v0.10.0 h1:LqXXPOXuETY5Xe8ITdGisBzTYmUOy5eSj+9n4hLTjHI= github.com/quic-go/webtransport-go v0.10.0/go.mod h1:LeGIXr5BQKE3UsynwVBeQrU1TPrbh73MGoC6jd+V7ow= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= @@ -326,8 +324,6 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= -github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb h1:Ywfo8sUltxogBpFuMOFRrrSifO788kAFxmvVw31PtQQ= github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb/go.mod h1:ikPs9bRWicNw3S7XpJ8sK/smGwU9WcSVU3dy9qahYBM= github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= diff --git a/provider/noop.go b/provider/noop.go index 0150967e8..4a108554d 100644 --- a/provider/noop.go +++ b/provider/noop.go @@ -15,8 +15,8 @@ func NewNoopProvider() System { return &noopProvider{} } -func (op *noopProvider) Clear() int { - return 0 +func (op *noopProvider) Clear() error { + return nil } func (op *noopProvider) Close() error { diff --git a/provider/provider.go b/provider/provider.go index a52d77b26..45db6c62a 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -41,7 +41,7 @@ type Reprovider interface { type System interface { // Clear removes all entries from the provide queue. Returns the number of // CIDs removed from the queue. - Clear() int + Clear() error Close() error Stat() (ReproviderStats, error) SetKeyProvider(kp KeyChanFunc) diff --git a/provider/reprovider.go b/provider/reprovider.go index 698a393b0..f1934cace 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -5,15 +5,17 @@ import ( "errors" "fmt" "math" + "os" + "path/filepath" "strconv" "sync" "time" + "github.com/gammazero/cascadeq" "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-dsqueue" logging "github.com/ipfs/go-log/v2" metrics "github.com/ipfs/go-metrics-interface" "github.com/multiformats/go-multihash" @@ -39,10 +41,6 @@ const ( // batchReadSize is number of CIDs to read from provide queue in one visit. batchReadSize = 2048 - - // dedupCacheSize is the number of CIDs which deduplication is done across. - // Set to 0 is disable deduplication. - dedupCacheSize = 0 ) var log = logging.Logger("provider") @@ -65,7 +63,7 @@ type reprovider struct { keyProviderLock sync.RWMutex keyProvider KeyChanFunc - q *dsqueue.DSQueue + q *cascadeq.Queue ds datastore.Batching maxReprovideBatchSize uint @@ -87,6 +85,7 @@ type reprovider struct { throughputMinimumProvides uint keyPrefix datastore.Key + queueDir string } var _ System = (*reprovider)(nil) @@ -153,7 +152,14 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } s.ds = namespace.Wrap(ds, s.keyPrefix) - s.q = dsqueue.New(s.ds, "provide", dsqueue.WithDedupCacheSize(dedupCacheSize)) + + if s.queueDir == "" { + s.queueDir = filepath.Join(os.TempDir(), "providequeue") + } + s.q, err = cascadeq.New(s.queueDir) + if err != nil { + return nil, err + } // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options @@ -221,6 +227,18 @@ func ProvideWorkerCount(n int) Option { } } +// QueueDir configures the base directory where the providr queue subdirectory +// exists. If dir is set to "/tmp" then provide queue files are saved in the +// "/tmp/providequeue" subdirectory. +func QueueDir(dir string) Option { + return func(system *reprovider) error { + if dir != "" { + system.queueDir = dir + } + return nil + } +} + // MaxBatchSize limits how big each batch is. // // Some content routers like acceleratedDHTClient have sub linear scalling and @@ -353,15 +371,13 @@ func (s *reprovider) provideWorker() { provideOperation(s.ctx, c) } + buf := make([][]byte, batchReadSize) + for data := range s.q.Out() { provideCid(data) - buf, err := s.q.GetN(batchReadSize) - if err != nil { - log.Errorf("error fetching data from queue: %s", err) - continue - } - for _, data = range buf { - provideCid(data) + n := s.q.Drain(buf) + for i := range n { + provideCid(buf[i]) } } } @@ -434,9 +450,8 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } -// Clear removes all entries from the provide queue. Returns the number of CIDs -// removed from the queue. -func (s *reprovider) Clear() int { +// Clear removes all entries from the provide queue. +func (s *reprovider) Clear() error { return s.q.Clear() } diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go index 357af5747..69a3b767f 100644 --- a/provider/reprovider_test.go +++ b/provider/reprovider_test.go @@ -118,9 +118,11 @@ func testProvider(t *testing.T, singleProvide bool) { keysToProvide[i] = c } + qdir := t.TempDir() + var keyWait sync.Mutex keyWait.Lock() - batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + batchSystem, err := New(ds, QueueDir(qdir), Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid) go func() { defer keyWait.Unlock() @@ -197,6 +199,7 @@ func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { if runtime.GOOS == "windows" { test.Flaky(t) } + // Don't run in Parallel as this test is time sensitive. someHash, err := mh.Sum([]byte("Vires in Numeris!"), mh.BLAKE3, -1) @@ -206,7 +209,8 @@ func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) // First public using an offline system to enqueue in the datastore. - sys, err := New(ds) + qdir := t.TempDir() + sys, err := New(ds, QueueDir(qdir)) assert.NoError(t, err) err = sys.Provide(context.Background(), c, true) @@ -217,7 +221,7 @@ func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { // Secondly restart an online datastore and we want to see this previously provided cid published. prov := &mockProvideMany{} - sys, err = New(ds, Online(prov), initialReprovideDelay(0)) + sys, err = New(ds, QueueDir(qdir), Online(prov), initialReprovideDelay(0)) assert.NoError(t, err) time.Sleep(time.Millisecond * 10) // give it time to call provider after that