Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Unreleased

- RowBinary: truncate NaiveDateTime resulting from DateTime64: https://github.com/plausible/ch/pull/297
- RowBinary: truncate NaiveDateTime resulting from DateTime64 https://github.com/plausible/ch/pull/297
- HTTP: drop Mint timeouts, rely on ownership duration instead https://github.com/plausible/ch/pull/301

## 0.7.1 (2026-01-15)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defaults = [
database: "default",
settings: [],
pool_size: 1,
timeout: :timer.seconds(15)
timeout: to_timeout(second: 15)
]

# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
Expand Down
2 changes: 0 additions & 2 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP request/receive timeout in milliseconds
"""
@type common_option ::
{:database, String.t()}
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
| {:timeout, timeout}

@typedoc """
Options for starting the connection pool.
Expand Down
76 changes: 44 additions & 32 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@ defmodule Ch.Connection do
scheme = String.to_existing_atom(opts[:scheme] || "http")
address = opts[:hostname] || "localhost"
port = opts[:port] || 8123

timeout =
opts[:timeout] || get_in(opts, [:transport_opts, :timeout]) || to_timeout(second: 15)

deadline = timeout_to_deadline(timeout)
mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts])

with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
conn =
conn
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
|> maybe_put_private(:settings, opts[:settings])

handshake = Query.build("select 1, version()")
params = DBConnection.Query.encode(handshake, _params = [], _opts = [])
recv_timeout = deadline_to_timeout(deadline)

case handle_execute(handshake, params, _opts = [], conn) do
case handle_execute(handshake, params, _opts = [recv_timeout: recv_timeout], conn) do
Copy link
Copy Markdown
Collaborator Author

@ruslandoga ruslandoga Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can add receive_timeout option similar to Finch, and document how it's different from DBConnection's timeout. We still need receive_timeout in some places like this handshake and ping.

{:ok, handshake, responses, conn} ->
case DBConnection.Query.decode(handshake, responses, _opts = []) do
%Result{rows: [[1, version]]} ->
Expand Down Expand Up @@ -82,7 +87,7 @@ defmodule Ch.Connection do
def ping(conn) do
headers = [{"user-agent", @user_agent}]

case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do
case request(conn, "GET", "/ping", headers, _body = [], _timeout = to_timeout(second: 5)) do
{:ok, conn, _response} -> {:ok, conn}
{:error, error, conn} -> {:disconnect, error, conn}
{:disconnect, _error, _conn} = disconnect -> disconnect
Expand Down Expand Up @@ -122,10 +127,9 @@ defmodule Ch.Connection do

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
timeout = timeout(conn, opts)

with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body),
{:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do
{:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, _timeout = :infinity) do
result = %Result{
command: command,
columns: columns,
Expand All @@ -144,24 +148,24 @@ defmodule Ch.Connection do

defp recv_declare(conn, decode, timeout) do
acc = %{decode: decode, step: :status, buffer: [], headers: []}
recv_declare_continue(conn, acc, timeout)
recv_declare_continue(conn, acc, timeout_to_deadline(timeout))
end

defp recv_declare_continue(conn, acc, timeout) do
case HTTP.recv(conn, 0, timeout) do
defp recv_declare_continue(conn, acc, deadline) do
case HTTP.recv(conn, 0, deadline_to_timeout(deadline)) do
{:ok, conn, responses} ->
case handle_recv_declare(responses, acc) do
{:ok, columns, headers, reader} ->
{:ok, conn, columns, headers, reader}

{:more, acc} ->
recv_declare_continue(conn, acc, timeout)
recv_declare_continue(conn, acc, deadline)

:error ->
all_responses_result =
case handle_all_responses(responses, []) do
{:ok, responses} -> {:ok, conn, responses}
{:more, acc} -> recv_all(conn, acc, timeout)
{:more, acc} -> recv_all(conn, acc, deadline)
end

with {:ok, conn, responses} <- all_responses_result do
Expand Down Expand Up @@ -255,9 +259,7 @@ defmodule Ch.Connection do
end

defp handle_fetch_recv(query, result, opts, conn, reader) do
timeout = timeout(conn, opts)

case HTTP.recv(conn, 0, timeout) do
case HTTP.recv(conn, 0, :infinity) do
{:ok, conn, responses} ->
reader = %{reader | responses: responses}
handle_fetch(query, result, opts, {conn, reader})
Expand Down Expand Up @@ -296,12 +298,12 @@ defmodule Ch.Connection do
end
end

def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do
def handle_execute(%Query{} = query, {:stream, ref, body}, _opts, conn) do
case HTTP.stream_request_body(conn, ref, body) do
{:ok, conn} ->
case body do
:eof ->
with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
with {:ok, conn, responses} <- receive_full_response(conn, _timeout = :infinity) do
{:ok, query, responses, conn}
end

Expand All @@ -322,9 +324,9 @@ defmodule Ch.Connection do

result =
if is_function(body, 2) do
request_chunked(conn, "POST", path, headers, body, opts)
request_chunked(conn, "POST", path, headers, body, _timeout = :infinity)
else
request(conn, "POST", path, headers, body, opts)
request(conn, "POST", path, headers, body, _timeout = :infinity)
end

case result do
Expand All @@ -339,8 +341,9 @@ defmodule Ch.Connection do

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
timeout = opts[:recv_timeout] || :infinity

case request(conn, "POST", path, headers, body, opts) do
case request(conn, "POST", path, headers, body, timeout) do
{:ok, conn, responses} -> {:ok, query, responses, conn}
{:error, _reason, _conn} = client_error -> client_error
{:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn}
Expand All @@ -359,24 +362,24 @@ defmodule Ch.Connection do

@typep response :: Mint.Types.status() | Mint.Types.headers() | binary

@spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) ::
@spec request(conn, binary, binary, Mint.Types.headers(), iodata, timeout) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp request(conn, method, path, headers, body, opts) do
defp request(conn, method, path, headers, body, timeout) do
with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do
receive_full_response(conn, timeout(conn, opts))
receive_full_response(conn, timeout)
end
end

@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), timeout) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
def request_chunked(conn, method, path, headers, stream, opts) do
def request_chunked(conn, method, path, headers, stream, timeout) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
do: receive_full_response(conn, timeout)
end

@spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
Expand Down Expand Up @@ -410,7 +413,7 @@ defmodule Ch.Connection do
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp receive_full_response(conn, timeout) do
with {:ok, conn, responses} <- recv_all(conn, [], timeout) do
with {:ok, conn, responses} <- recv_all(conn, [], timeout_to_deadline(timeout)) do
case responses do
[200, headers | _rest] ->
conn = ensure_same_server(conn, headers)
Expand All @@ -429,14 +432,14 @@ defmodule Ch.Connection do
end
end

@spec recv_all(conn, [response], timeout()) ::
@spec recv_all(conn, [response], non_neg_integer | nil) ::
{:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn}
defp recv_all(conn, acc, timeout) do
case HTTP.recv(conn, 0, timeout) do
defp recv_all(conn, acc, deadline) do
case HTTP.recv(conn, 0, deadline_to_timeout(deadline)) do
{:ok, conn, responses} ->
case handle_all_responses(responses, acc) do
{:ok, responses} -> {:ok, conn, responses}
{:more, acc} -> recv_all(conn, acc, timeout)
{:more, acc} -> recv_all(conn, acc, deadline)
end

{:error, conn, reason, _responses} ->
Expand All @@ -453,12 +456,21 @@ defmodule Ch.Connection do
defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)}
defp handle_all_responses([], acc), do: {:more, acc}

defp timeout_to_deadline(:infinity), do: nil

defp timeout_to_deadline(timeout) do
System.monotonic_time(:millisecond) + timeout
end

defp deadline_to_timeout(nil), do: :infinity

defp deadline_to_timeout(deadline) do
max(0, deadline - System.monotonic_time(:millisecond))
end

defp maybe_put_private(conn, _k, nil), do: conn
defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v)

defp timeout(conn), do: HTTP.get_private(conn, :timeout)
defp timeout(conn, opts), do: Keyword.get(opts, :timeout) || timeout(conn)

defp settings(conn, opts) do
default_settings = HTTP.get_private(conn, :settings, [])
opts_settings = Keyword.get(opts, :settings, [])
Expand Down
7 changes: 4 additions & 3 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1631,11 +1631,12 @@ defmodule Ch.ConnectionTest do
end

describe "options" do
# this test is flaky, sometimes it raises due to ownership timeout
@tag capture_log: true, skip: true
@tag capture_log: true
test "can provide custom timeout", ctx do
assert {:error, %Mint.TransportError{reason: :timeout} = error} =
parameterize_query(ctx, "select sleep(1)", _params = [], timeout: 100)
parameterize_query(ctx, "select sleep(1)", _params = [],
timeout: to_timeout(millisecond: 100)
)

assert Exception.message(error) == "timeout"
end
Expand Down
6 changes: 4 additions & 2 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ defmodule Ch.FaultsTest do
} do
log =
capture_async_log(fn ->
{:ok, conn} = Ch.start_link(port: port, timeout: 100)
{:ok, conn} = Ch.start_link(port: port)

# connect
{:ok, mint} = :gen_tcp.accept(listen)
Expand All @@ -277,6 +277,7 @@ defmodule Ch.FaultsTest do

select =
Task.async(fn ->
query_options = Keyword.put(query_options, :timeout, 100)
Ch.query(conn, "select 1 + 1", [], query_options)
end)

Expand All @@ -298,7 +299,8 @@ defmodule Ch.FaultsTest do
assert {:ok, %Ch.Result{rows: [[2]]}} = Task.await(select)
end)

assert log =~ "disconnected: ** (Mint.TransportError) timeout"
assert log =~
"timed out because it queued and checked out the connection for longer than 100ms"
end

test "reconnects after closed on response", ctx do
Expand Down
Loading