diff --git a/CHANGELOG.md b/CHANGELOG.md index fb06673..5e3ef29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - RowBinary: truncate NaiveDateTime resulting from DateTime64: https://github.com/plausible/ch/pull/297 +- drop chunked requests from `Ch.query/4` https://github.com/plausible/ch/pull/305 ## 0.7.1 (2026-01-15) diff --git a/README.md b/README.md index 8f146d4..74ff244 100644 --- a/README.md +++ b/README.md @@ -158,16 +158,17 @@ csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end) -chunked = Stream.chunk_every(stream, 100) -encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) -ten_encoded_chunks = Stream.take(encoded, 10) - -%Ch.Result{num_rows: 1000} = - Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false) +DBConnection.run(pid, fn conn -> + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(100) + |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(10) + |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) + |> Stream.run() +end) ``` -This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage. +This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request. #### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) diff --git a/lib/ch.ex b/lib/ch.ex index 6f2f567..3c3c796 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -114,13 +114,6 @@ defmodule Ch do %Ch.Stream{conn: conn, query: query, params: params, opts: opts} end - # TODO drop - @doc false - @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any - def run(conn, f, opts \\ []) when is_function(f, 1) do - DBConnection.run(conn, f, opts) - end - if Code.ensure_loaded?(Ecto.ParameterizedType) do @behaviour Ecto.ParameterizedType diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index b53394a..a4b0fe3 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -320,14 +320,7 @@ defmodule Ch.Connection do path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - result = - if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) - else - request(conn, "POST", path, headers, body, opts) - end - - case result do + case request(conn, "POST", path, headers, body, opts) do {:ok, conn, responses} -> {:ok, query, responses, conn} {:error, _reason, _conn} = client_error -> client_error {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} @@ -369,33 +362,6 @@ defmodule Ch.Connection do end end - @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), - {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) - end - - @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: - {:ok, conn} | {:disconnect, Mint.Types.error(), conn} - defp stream_body(conn, ref, stream) do - result = - stream - |> Stream.concat([:eof]) - |> Enum.reduce_while({:ok, conn}, fn - chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)} - _chunk, {:error, _conn, _reason} = error -> {:halt, error} - end) - - case result do - {:ok, _conn} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - # stacktrace is a bit cleaner with this function inlined @compile inline: [send_request: 5] defp send_request(conn, method, path, headers, body) do diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 37e0596..04eeb8f 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -128,22 +128,15 @@ defimpl DBConnection.Query, for: Ch.Query do @spec encode(Query.t(), params, [Ch.query_option()]) :: {query_params, Mint.Types.headers(), body} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(), + when params: map | [term] | [row :: [term]] | iodata, query_params: [{String.t(), String.t()}], - body: iodata | Enumerable.t() + body: iodata - def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do - body = - case data do - _ when is_list(data) or is_binary(data) -> [statement, ?\n | data] - _ -> Stream.concat([[statement, ?\n]], data) - end - - {_query_params = [], headers(opts), body} - end - - def encode(%Query{command: :insert, statement: statement}, params, opts) do + def encode(%Query{command: :insert, encode: encode, statement: statement}, params, opts) do cond do + not encode -> + {query_params(params), headers(opts), statement} + names = Keyword.get(opts, :names) -> types = Keyword.fetch!(opts, :types) header = RowBinary.encode_names_and_types(names, types) diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index 57c48d3..6993913 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -341,13 +341,13 @@ defmodule Ch.ConnectionTest do end test "manual RowBinary", %{table: table} = ctx do - stmt = "insert into #{table}(a, b) format RowBinary" + stmt = "insert into #{table}(a, b) format RowBinary\n" types = ["UInt8", "String"] rows = [[1, "a"], [2, "b"]] data = RowBinary.encode_rows(rows, types) - parameterize_query!(ctx, stmt, data, encode: false) + parameterize_query!(ctx, [stmt | data], _params = %{}, encode: false) assert %{rows: rows} = parameterize_query!(ctx, "select * from {table:Identifier}", %{"table" => table}) @@ -357,24 +357,28 @@ defmodule Ch.ConnectionTest do test "chunked", %{table: table} = ctx do types = ["UInt8", "String"] - rows = [[1, "a"], [2, "b"], [3, "c"]] + rows = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]] + + DBConnection.run(ctx.conn, fn conn -> + sink = + Ch.stream( + conn, + "insert into #{table}(a, b) format RowBinary\n", + _params = %{}, + Keyword.merge(ctx.query_options, encode: false) + ) - stream = rows |> Stream.chunk_every(2) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - - parameterize_query( - ctx, - "insert into #{table}(a, b) format RowBinary", - stream, - encode: false - ) + |> Stream.into(sink) + |> Stream.run() + end) assert {:ok, %{rows: rows}} = parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table}) - assert rows == [[1, "a"], [2, "b"], [3, "c"]] + assert rows == [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]] end test "select", %{table: table} = ctx do @@ -1334,8 +1338,8 @@ defmodule Ch.ConnectionTest do # weird thing about nullables is that, similar to bool, in binary format, any byte larger than 0 is `null` parameterize_query( ctx, - "insert into nullable format RowBinary", - <<1, 2, 3, 4, 5>>, + ["insert into nullable format RowBinary\n" | <<1, 2, 3, 4, 5>>], + _params = %{}, encode: false ) @@ -1708,7 +1712,7 @@ defmodule Ch.ConnectionTest do test "disconnects on early halt", ctx do logs = ExUnit.CaptureLog.capture_log(fn -> - Ch.run(ctx.conn, fn conn -> + DBConnection.run(ctx.conn, fn conn -> conn |> Ch.stream("select number from system.numbers") |> Enum.take(1) end) diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index bcc7457..ee71414 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -405,8 +405,7 @@ defmodule Ch.FaultsTest do test "reconnects after closed before streaming request", ctx do %{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx - rows = [[1, 2], [3, 4]] - stream = Stream.map(rows, fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end) + Process.flag(:trap_exit, true) log = capture_async_log(fn -> @@ -424,12 +423,20 @@ defmodule Ch.FaultsTest do insert = Task.async(fn -> - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - Keyword.merge(query_options, encode: false) - ) + DBConnection.run(conn, fn conn -> + sink = + Ch.stream( + conn, + "insert into unknown_table(a,b) format RowBinary", + _params = %{}, + Keyword.merge(query_options, encode: false) + ) + + (_rows = [[1, 2], [3, 4]]) + |> Stream.map(fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end) + |> Stream.into(sink) + |> Stream.run() + end) end) # reconnect @@ -443,8 +450,8 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert {:error, %Ch.Error{code: 60, message: message}} = Task.await(insert) - assert message =~ ~r/UNKNOWN_TABLE/ + assert {{%DBConnection.ConnectionError{}, _stacktrace}, _task} = + catch_exit(Task.await(insert)) end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed" @@ -453,9 +460,6 @@ defmodule Ch.FaultsTest do test "reconnects after closed while streaming request", ctx do %{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx - rows = [[1, 2], [3, 4]] - stream = Stream.map(rows, fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end) - log = capture_async_log(fn -> {:ok, conn} = Ch.start_link(database: Ch.Test.database(), port: port) @@ -469,12 +473,20 @@ defmodule Ch.FaultsTest do insert = Task.async(fn -> - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - Keyword.merge(query_options, encode: false) - ) + DBConnection.run(conn, fn conn -> + sink = + Ch.stream( + conn, + "insert into unknown_table(a,b) format RowBinary", + _params = %{}, + Keyword.merge(query_options, encode: false) + ) + + (_rows = [[1, 2], [3, 4]]) + |> Stream.map(fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end) + |> Stream.into(sink) + |> Stream.run() + end) end) # close after first packet from mint arrives diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs index 06d95de..8dea6ce 100644 --- a/test/ch/stream_test.exs +++ b/test/ch/stream_test.exs @@ -68,14 +68,15 @@ defmodule Ch.StreamTest do |> Stream.chunk_every(100_000) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) |> Stream.take(10) - |> Enum.into( + |> Stream.into( Ch.stream( conn, - "insert into collect_stream(i) format RowBinary", + "insert into collect_stream(i) format RowBinary\n", _params = [], Keyword.merge(query_options, encode: false) ) ) + |> Stream.run() end) assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]