Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- RowBinary: truncate NaiveDateTime resulting from DateTime64: https://github.com/plausible/ch/pull/297
- drop chunked requests from `Ch.query/4` https://github.com/plausible/ch/pull/305

## 0.7.1 (2026-01-15)

Expand Down
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,17 @@ csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
chunked = Stream.chunk_every(stream, 100)
encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
ten_encoded_chunks = Stream.take(encoded, 10)

%Ch.Result{num_rows: 1000} =
Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false)
DBConnection.run(pid, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
|> Stream.run()
end)
```

This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage.
This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request.

#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

Expand Down
7 changes: 0 additions & 7 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,6 @@ defmodule Ch do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end

# TODO drop
@doc false
@spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
def run(conn, f, opts \\ []) when is_function(f, 1) do
DBConnection.run(conn, f, opts)
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
@behaviour Ecto.ParameterizedType

Expand Down
36 changes: 1 addition & 35 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,7 @@ defmodule Ch.Connection do
path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_function(body, 2) do
request_chunked(conn, "POST", path, headers, body, opts)
else
request(conn, "POST", path, headers, body, opts)
end

case result do
case request(conn, "POST", path, headers, body, opts) do
{:ok, conn, responses} -> {:ok, query, responses, conn}
{:error, _reason, _conn} = client_error -> client_error
{:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn}
Expand Down Expand Up @@ -369,33 +362,6 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
def request_chunked(conn, method, path, headers, stream, opts) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
end

@spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
{:ok, conn} | {:disconnect, Mint.Types.error(), conn}
defp stream_body(conn, ref, stream) do
result =
stream
|> Stream.concat([:eof])
|> Enum.reduce_while({:ok, conn}, fn
chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
_chunk, {:error, _conn, _reason} = error -> {:halt, error}
end)

case result do
{:ok, _conn} = ok -> ok
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end

# stacktrace is a bit cleaner with this function inlined
@compile inline: [send_request: 5]
defp send_request(conn, method, path, headers, body) do
Expand Down
19 changes: 6 additions & 13 deletions lib/ch/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,15 @@ defimpl DBConnection.Query, for: Ch.Query do

@spec encode(Query.t(), params, [Ch.query_option()]) ::
{query_params, Mint.Types.headers(), body}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(),
when params: map | [term] | [row :: [term]] | iodata,
query_params: [{String.t(), String.t()}],
body: iodata | Enumerable.t()
body: iodata

def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do
body =
case data do
_ when is_list(data) or is_binary(data) -> [statement, ?\n | data]
_ -> Stream.concat([[statement, ?\n]], data)
end

{_query_params = [], headers(opts), body}
end

def encode(%Query{command: :insert, statement: statement}, params, opts) do
def encode(%Query{command: :insert, encode: encode, statement: statement}, params, opts) do
cond do
not encode ->
{query_params(params), headers(opts), statement}

names = Keyword.get(opts, :names) ->
types = Keyword.fetch!(opts, :types)
header = RowBinary.encode_names_and_types(names, types)
Expand Down
34 changes: 19 additions & 15 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,13 @@
end

test "manual RowBinary", %{table: table} = ctx do
stmt = "insert into #{table}(a, b) format RowBinary"
stmt = "insert into #{table}(a, b) format RowBinary\n"

types = ["UInt8", "String"]
rows = [[1, "a"], [2, "b"]]
data = RowBinary.encode_rows(rows, types)

parameterize_query!(ctx, stmt, data, encode: false)
parameterize_query!(ctx, [stmt | data], _params = %{}, encode: false)

assert %{rows: rows} =
parameterize_query!(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -355,26 +355,30 @@
assert rows == [[1, "a"], [2, "b"]]
end

test "chunked", %{table: table} = ctx do

Check failure on line 358 in test/ch/connection_test.exs

View workflow job for this annotation

GitHub Actions / test (Elixir 1.15 / OTP 25 / ClickHouse latest / TZ UTC)

test insert chunked (Ch.ConnectionTest)
types = ["UInt8", "String"]
rows = [[1, "a"], [2, "b"], [3, "c"]]
rows = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]]

DBConnection.run(ctx.conn, fn conn ->
sink =
Ch.stream(
conn,
"insert into #{table}(a, b) format RowBinary\n",
_params = %{},
Keyword.merge(ctx.query_options, encode: false)
)

stream =
rows
|> Stream.chunk_every(2)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)

parameterize_query(
ctx,
"insert into #{table}(a, b) format RowBinary",
stream,
encode: false
)
|> Stream.into(sink)
|> Stream.run()
end)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})

assert rows == [[1, "a"], [2, "b"], [3, "c"]]
assert rows == [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]]
end

test "select", %{table: table} = ctx do
Expand Down Expand Up @@ -1334,8 +1338,8 @@
# weird thing about nullables is that, similar to bool, in binary format, any byte larger than 0 is `null`
parameterize_query(
ctx,
"insert into nullable format RowBinary",
<<1, 2, 3, 4, 5>>,
["insert into nullable format RowBinary\n" | <<1, 2, 3, 4, 5>>],
_params = %{},
encode: false
)

Expand Down Expand Up @@ -1708,7 +1712,7 @@
test "disconnects on early halt", ctx do
logs =
ExUnit.CaptureLog.capture_log(fn ->
Ch.run(ctx.conn, fn conn ->
DBConnection.run(ctx.conn, fn conn ->
conn |> Ch.stream("select number from system.numbers") |> Enum.take(1)
end)

Expand Down
50 changes: 31 additions & 19 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,10 @@

# TODO non-chunked request

test "reconnects after closed before streaming request", ctx do

Check failure on line 405 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (Elixir 1.15 / OTP 25 / ClickHouse latest / TZ UTC)

test query reconnects after closed before streaming request (Ch.FaultsTest)
%{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx

rows = [[1, 2], [3, 4]]
stream = Stream.map(rows, fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end)
Process.flag(:trap_exit, true)

log =
capture_async_log(fn ->
Expand All @@ -424,12 +423,20 @@

insert =
Task.async(fn ->
Ch.query(
conn,
"insert into unknown_table(a,b) format RowBinary",
stream,
Keyword.merge(query_options, encode: false)
)
DBConnection.run(conn, fn conn ->
sink =
Ch.stream(
conn,
"insert into unknown_table(a,b) format RowBinary",
_params = %{},
Keyword.merge(query_options, encode: false)
)

(_rows = [[1, 2], [3, 4]])
|> Stream.map(fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end)
|> Stream.into(sink)
|> Stream.run()
end)
end)

# reconnect
Expand All @@ -443,19 +450,16 @@
:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

assert {:error, %Ch.Error{code: 60, message: message}} = Task.await(insert)
assert message =~ ~r/UNKNOWN_TABLE/
assert {{%DBConnection.ConnectionError{}, _stacktrace}, _task} =
catch_exit(Task.await(insert))
end)

assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end

test "reconnects after closed while streaming request", ctx do

Check failure on line 460 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (Elixir 1.15 / OTP 25 / ClickHouse latest / TZ UTC)

test query reconnects after closed while streaming request (Ch.FaultsTest)
%{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx

rows = [[1, 2], [3, 4]]
stream = Stream.map(rows, fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end)

log =
capture_async_log(fn ->
{:ok, conn} = Ch.start_link(database: Ch.Test.database(), port: port)
Expand All @@ -469,12 +473,20 @@

insert =
Task.async(fn ->
Ch.query(
conn,
"insert into unknown_table(a,b) format RowBinary",
stream,
Keyword.merge(query_options, encode: false)
)
DBConnection.run(conn, fn conn ->
sink =
Ch.stream(
conn,
"insert into unknown_table(a,b) format RowBinary",
_params = %{},
Keyword.merge(query_options, encode: false)
)

(_rows = [[1, 2], [3, 4]])
|> Stream.map(fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end)
|> Stream.into(sink)
|> Stream.run()
end)
end)

# close after first packet from mint arrives
Expand Down
5 changes: 3 additions & 2 deletions test/ch/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ defmodule Ch.StreamTest do
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(
|> Stream.into(
Ch.stream(
conn,
"insert into collect_stream(i) format RowBinary",
"insert into collect_stream(i) format RowBinary\n",
_params = [],
Keyword.merge(query_options, encode: false)
)
)
|> Stream.run()
end)

assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]
Expand Down
Loading