Skip to content

Optionally use parallel consumers for bulk import indexing#1048

Closed
henrik242 wants to merge 3 commits intokomoot:masterfrom
entur:henrik/import-perf-threading-only2
Closed

Optionally use parallel consumers for bulk import indexing#1048
henrik242 wants to merge 3 commits intokomoot:masterfrom
entur:henrik/import-perf-threading-only2

Conversation

@henrik242
Copy link
Copy Markdown
Contributor

Use a specified number of parallel consumer threads for OpenSearch bulk indexing, optimally one per shard (=5). Each thread has its own Importer instance with an independent BulkRequest buffer, sharing the thread-safe OpenSearchClient.

Replaces the sentinel-based single-thread shutdown with a volatile flag + poll timeout for clean multi-thread termination.

Benchmark (Belgium 11GB, 4.9M docs): 221s -> 166s (25% faster).

@henrik242 henrik242 force-pushed the henrik/import-perf-threading-only2 branch from 1f20ac0 to f976ba2 Compare April 10, 2026 06:56
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit f976ba2. Configure here.

Comment thread src/main/java/de/komoot/photon/nominatim/ImportThread.java
Comment thread src/main/java/de/komoot/photon/App.java
Use a specified number of parallel consumer threads for OpenSearch bulk indexing, optimally one per shard (=5). Each thread has its own Importer instance with an independent BulkRequest buffer, sharing the thread-safe OpenSearchClient.

Replaces the sentinel-based single-thread shutdown with a volatile flag + poll timeout for clean multi-thread termination.

Benchmark (Belgium 11GB, 4.9M docs): 221s -> 166s (25% faster).
Move the OpenSearch index refresh out of per-consumer Importer.finish()
and into a single call in App after all threads complete. Also guard
ObjectMapper serializer registration to run only once.
@henrik242 henrik242 force-pushed the henrik/import-perf-threading-only2 branch from 26a9ac8 to 1be62d5 Compare April 14, 2026 07:06
@henrik242
Copy link
Copy Markdown
Contributor Author

henrik242 commented Apr 14, 2026

Belgium Import Benchmark Results

Dataset: belgium (4,954,824 documents, 12 GB uncompressed ndjson)
Java: OpenJDK 21.0.10, -Xmx4g
Machine: macOS Darwin 25.4.0 (Apple Silicon)

Metric Baseline (dad4572) HEAD -j 1 HEAD -j 5 rescore -j 1 rescore -j 5
Processing time 233s 220s (-5.6%) 155s (-33.5%) 219s (-6.0%) 145s (-37.8%)
Wall clock time 3m 57s 3m 45s (-5.1%) 2m 41s (-32.2%) 3m 44s (-5.5%) 2m 30s (-36.7%)
Throughput (docs/s) ~21,430 ~22,580 (+5.4%) ~32,010 (+49.4%) ~22,630 (+5.6%) ~34,170 (+59.4%)
Index size 1.87 GB 1.77 GB (-5.3%) 2.00 GB (+6.9%) 1.81 GB (-3.2%) 2.02 GB (+8.0%)
CPU time (user) 17m 46s 16m 49s (-5.3%) 21m 35s (+21.5%) 17m 15s (-2.9%) 21m 29s (+20.9%)

EDIT: Tested with the rescore branch (#1051) on top too

@lonvia
Copy link
Copy Markdown
Collaborator

lonvia commented Apr 15, 2026

Now there is a new error when running a planet import with -j 5:

[2026-04-14T23:14:46,882][ERROR][d.k.p.n.ImportThread     ] Import error.
java.lang.RuntimeException: Error during bulk import
        at de.komoot.photon.opensearch.Importer.saveDocuments(Importer.java:90)
        at de.komoot.photon.opensearch.Importer.add(Importer.java:54)
        at de.komoot.photon.nominatim.ImportThread$ImportRunnable.runImport(ImportThread.java:127)
        at de.komoot.photon.nominatim.ImportThread$ImportRunnable.run(ImportThread.java:98)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
        at java.base/java.lang.Thread.run(Thread.java:1474)
Caused by: org.opensearch.client.transport.httpclient5.ResponseException: method [POST], host [http://127.0.0.1:9201], URI [/_bulk], status line [HTTP/1.1 429 Too Many Requests]
{"error":{"root_cause":[{"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [<http_request>] would be [30978338154/28.8gb], which is larger than the limit of [30569765273/28.4gb], real usage: [30953963520/28.8gb], new bytes reserved: [24374634/23.2mb], usages [request=3014824/2.8mb, fielddata=0/0b, in_flight_requests=130543900/124.4mb]","bytes_wanted":30978338154,"bytes_limit":30569765273,"durability":"TRANSIENT"}],"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [<http_request>] would be [30978338154/28.8gb], which is larger than the limit of [30569765273/28.4gb], real usage: [30953963520/28.8gb], new bytes reserved: [24374634/23.2mb], usages [request=3014824/2.8mb, fielddata=0/0b, in_flight_requests=130543900/124.4mb]","bytes_wanted":30978338154,"bytes_limit":30569765273,"durability":"TRANSIENT"},"status":429}
        at org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport.extractAndWrapCause(ApacheHttpClient5Transport.java:1140)
        at org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport.performRequest(ApacheHttpClient5Transport.java:159)
        at org.opensearch.client.opensearch.OpenSearchClient.bulk(OpenSearchClient.java:113)
        at de.komoot.photon.opensearch.Importer.saveDocuments(Importer.java:69)
        ... 6 more
Caused by: org.opensearch.client.transport.httpclient5.ResponseException: method [POST], host [http://127.0.0.1:9201], URI [/_bulk], status line [HTTP/1.1 429 Too Many Requests]
{"error":{"root_cause":[{"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [<http_request>] would be [30978338154/28.8gb], which is larger than the limit of [30569765273/28.4gb], real usage: [30953963520/28.8gb], new bytes reserved: [24374634/23.2mb], usages [request=3014824/2.8mb, fielddata=0/0b, in_flight_requests=130543900/124.4mb]","bytes_wanted":30978338154,"bytes_limit":30569765273,"durability":"TRANSIENT"}],"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [<http_request>] would be [30978338154/28.8gb], which is larger than the limit of [30569765273/28.4gb], real usage: [30953963520/28.8gb], new bytes reserved: [24374634/23.2mb], usages [request=3014824/2.8mb, fielddata=0/0b, in_flight_requests=130543900/124.4mb]","bytes_wanted":30978338154,"bytes_limit":30569765273,"durability":"TRANSIENT"},"status":429}
        at org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport.convertResponse(ApacheHttpClient5Transport.java:315)
        at org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport.access$400(ApacheHttpClient5Transport.java:106)
        at org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport$1.completed(ApacheHttpClient5Transport.java:230)
        at org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport$1.completed(ApacheHttpClient5Transport.java:226)
        at org.apache.hc.core5.concurrent.BasicFuture.completed(BasicFuture.java:148)
        at org.apache.hc.core5.concurrent.ComplexFuture.completed(ComplexFuture.java:72)
        at org.apache.hc.client5.http.impl.async.InternalAbstractHttpAsyncClient$2$1.completed(InternalAbstractHttpAsyncClient.java:321)
        at org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer$1.completed(AbstractAsyncResponseConsumer.java:101)
        at org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityConsumer.completed(AbstractBinAsyncEntityConsumer.java:87)
        at org.apache.hc.core5.http.nio.entity.AbstractBinDataConsumer.streamEnd(AbstractBinDataConsumer.java:83)
        at org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer.streamEnd(AbstractAsyncResponseConsumer.java:142)
        at org.apache.hc.client5.http.async.methods.InflatingGzipDataConsumer.streamEnd(InflatingGzipDataConsumer.java:148)
        at org.apache.hc.client5.http.impl.async.HttpAsyncMainClientExec$1.streamEnd(HttpAsyncMainClientExec.java:283)
        at org.apache.hc.core5.http.impl.nio.ClientHttp1StreamHandler.dataEnd(ClientHttp1StreamHandler.java:285)
        at org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer.dataEnd(ClientHttp1StreamDuplexer.java:376)
        at org.apache.hc.core5.http.impl.nio.AbstractHttp1StreamDuplexer.onInput(AbstractHttp1StreamDuplexer.java:343)
        at org.apache.hc.core5.http.impl.nio.AbstractHttp1IOEventHandler.inputReady(AbstractHttp1IOEventHandler.java:64)
        at org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler.inputReady(ClientHttp1IOEventHandler.java:41)
        at org.apache.hc.core5.reactor.InternalDataChannel.onIOEvent(InternalDataChannel.java:139)
        at org.apache.hc.core5.reactor.InternalChannel.handleIOEvent(InternalChannel.java:51)
        at org.apache.hc.core5.reactor.SingleCoreIOReactor.processEvents(SingleCoreIOReactor.java:193)
        at org.apache.hc.core5.reactor.SingleCoreIOReactor.doExecute(SingleCoreIOReactor.java:140)
        at org.apache.hc.core5.reactor.AbstractSingleCoreIOReactor.execute(AbstractSingleCoreIOReactor.java:92)
        at org.apache.hc.core5.reactor.IOReactorWorker.run(IOReactorWorker.java:44)
        ... 1 more

I'll defer this PR until after the 1.1. release. It might be that the processing chain needs am overhaul before multi-threaded processing can work.

@henrik242
Copy link
Copy Markdown
Contributor Author

Closing this in favour of #1055 (which should address this problem using backpressure and retries)

@henrik242 henrik242 closed this Apr 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants