From c49f0b8ae1f6636c0bcdcf81fff4d4917bffb9c3 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 25 Mar 2026 23:10:54 +0300 Subject: [PATCH 1/3] drop HTTP recv timeout --- CHANGELOG.md | 1 + README.md | 2 +- lib/ch.ex | 2 -- lib/ch/connection.ex | 61 ++++++++++++++++--------------------- test/ch/connection_test.exs | 7 +++-- 5 files changed, 33 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fb06673..218c9a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - RowBinary: truncate NaiveDateTime resulting from DateTime64: https://github.com/plausible/ch/pull/297 +- HTTP: drop Mint timeouts, rely on ownership duration instead ## 0.7.1 (2026-01-15) diff --git a/README.md b/README.md index 8f146d4..6f02df9 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ defaults = [ database: "default", settings: [], pool_size: 1, - timeout: :timer.seconds(15) + timeout: to_timeout(second: 15) ] # note that starting in ClickHouse 25.1.3.23 `default` user doesn't have diff --git a/lib/ch.ex b/lib/ch.ex index 6f2f567..ea6585f 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -9,14 +9,12 @@ defmodule Ch do * `:username` - Username * `:password` - User password * `:settings` - Keyword list of ClickHouse settings - * `:timeout` - HTTP request/receive timeout in milliseconds """ @type common_option :: {:database, String.t()} | {:username, String.t()} | {:password, String.t()} | {:settings, Keyword.t()} - | {:timeout, timeout} @typedoc """ Options for starting the connection pool. diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index b53394a..dbab3f0 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -20,7 +20,6 @@ defmodule Ch.Connection do with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do conn = conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) |> maybe_put_private(:database, opts[:database]) |> maybe_put_private(:username, opts[:username]) |> maybe_put_private(:password, opts[:password]) @@ -82,7 +81,7 @@ defmodule Ch.Connection do def ping(conn) do headers = [{"user-agent", @user_agent}] - case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do + case request(conn, "GET", "/ping", headers, _body = []) do {:ok, conn, _response} -> {:ok, conn} {:error, error, conn} -> {:disconnect, error, conn} {:disconnect, _error, _conn} = disconnect -> disconnect @@ -122,10 +121,9 @@ defmodule Ch.Connection do path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - timeout = timeout(conn, opts) with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), - {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do + {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode) do result = %Result{ command: command, columns: columns, @@ -142,26 +140,26 @@ defmodule Ch.Connection do end end - defp recv_declare(conn, decode, timeout) do + defp recv_declare(conn, decode) do acc = %{decode: decode, step: :status, buffer: [], headers: []} - recv_declare_continue(conn, acc, timeout) + recv_declare_continue(conn, acc) end - defp recv_declare_continue(conn, acc, timeout) do - case HTTP.recv(conn, 0, timeout) do + defp recv_declare_continue(conn, acc) do + case HTTP.recv(conn, 0, :infinity) do {:ok, conn, responses} -> case handle_recv_declare(responses, acc) do {:ok, columns, headers, reader} -> {:ok, conn, columns, headers, reader} {:more, acc} -> - recv_declare_continue(conn, acc, timeout) + recv_declare_continue(conn, acc) :error -> all_responses_result = case handle_all_responses(responses, []) do {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv_all(conn, acc, timeout) + {:more, acc} -> recv_all(conn, acc) end with {:ok, conn, responses} <- all_responses_result do @@ -255,9 +253,7 @@ defmodule Ch.Connection do end defp handle_fetch_recv(query, result, opts, conn, reader) do - timeout = timeout(conn, opts) - - case HTTP.recv(conn, 0, timeout) do + case HTTP.recv(conn, 0, :infinity) do {:ok, conn, responses} -> reader = %{reader | responses: responses} handle_fetch(query, result, opts, {conn, reader}) @@ -296,12 +292,12 @@ defmodule Ch.Connection do end end - def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do + def handle_execute(%Query{} = query, {:stream, ref, body}, _opts, conn) do case HTTP.stream_request_body(conn, ref, body) do {:ok, conn} -> case body do :eof -> - with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do + with {:ok, conn, responses} <- receive_full_response(conn) do {:ok, query, responses, conn} end @@ -322,9 +318,9 @@ defmodule Ch.Connection do result = if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) + request_chunked(conn, "POST", path, headers, body) else - request(conn, "POST", path, headers, body, opts) + request(conn, "POST", path, headers, body) end case result do @@ -340,7 +336,7 @@ defmodule Ch.Connection do path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - case request(conn, "POST", path, headers, body, opts) do + case request(conn, "POST", path, headers, body) do {:ok, conn, responses} -> {:ok, query, responses, conn} {:error, _reason, _conn} = client_error -> client_error {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} @@ -359,24 +355,24 @@ defmodule Ch.Connection do @typep response :: Mint.Types.status() | Mint.Types.headers() | binary - @spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) :: + @spec request(conn, binary, binary, Mint.Types.headers(), iodata) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - defp request(conn, method, path, headers, body, opts) do + defp request(conn, method, path, headers, body) do with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do - receive_full_response(conn, timeout(conn, opts)) + receive_full_response(conn) end end - @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) :: + @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t()) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do + def request_chunked(conn, method, path, headers, stream) do with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) + do: receive_full_response(conn) end @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: @@ -405,12 +401,12 @@ defmodule Ch.Connection do end end - @spec receive_full_response(conn, timeout) :: + @spec receive_full_response(conn) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - defp receive_full_response(conn, timeout) do - with {:ok, conn, responses} <- recv_all(conn, [], timeout) do + defp receive_full_response(conn) do + with {:ok, conn, responses} <- recv_all(conn, []) do case responses do [200, headers | _rest] -> conn = ensure_same_server(conn, headers) @@ -429,14 +425,14 @@ defmodule Ch.Connection do end end - @spec recv_all(conn, [response], timeout()) :: + @spec recv_all(conn, [response]) :: {:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn} - defp recv_all(conn, acc, timeout) do - case HTTP.recv(conn, 0, timeout) do + defp recv_all(conn, acc) do + case HTTP.recv(conn, 0, :infinity) do {:ok, conn, responses} -> case handle_all_responses(responses, acc) do {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv_all(conn, acc, timeout) + {:more, acc} -> recv_all(conn, acc) end {:error, conn, reason, _responses} -> @@ -456,9 +452,6 @@ defmodule Ch.Connection do defp maybe_put_private(conn, _k, nil), do: conn defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v) - defp timeout(conn), do: HTTP.get_private(conn, :timeout) - defp timeout(conn, opts), do: Keyword.get(opts, :timeout) || timeout(conn) - defp settings(conn, opts) do default_settings = HTTP.get_private(conn, :settings, []) opts_settings = Keyword.get(opts, :settings, []) diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index 57c48d3..e9399e9 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -1631,11 +1631,12 @@ defmodule Ch.ConnectionTest do end describe "options" do - # this test is flaky, sometimes it raises due to ownership timeout - @tag capture_log: true, skip: true + @tag capture_log: true test "can provide custom timeout", ctx do assert {:error, %Mint.TransportError{reason: :timeout} = error} = - parameterize_query(ctx, "select sleep(1)", _params = [], timeout: 100) + parameterize_query(ctx, "select sleep(1)", _params = [], + timeout: to_timeout(millisecond: 100) + ) assert Exception.message(error) == "timeout" end From 3700f67b44e8fa7529c964636382bb4a9f4b0c33 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 25 Mar 2026 23:12:51 +0300 Subject: [PATCH 2/3] changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 218c9a4..74afd89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,8 @@ ## Unreleased -- RowBinary: truncate NaiveDateTime resulting from DateTime64: https://github.com/plausible/ch/pull/297 -- HTTP: drop Mint timeouts, rely on ownership duration instead +- RowBinary: truncate NaiveDateTime resulting from DateTime64 https://github.com/plausible/ch/pull/297 +- HTTP: drop Mint timeouts, rely on ownership duration instead https://github.com/plausible/ch/pull/301 ## 0.7.1 (2026-01-15) From 939c102178be9d6999a2bcf4b8c0bf11d99de83d Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 25 Mar 2026 23:35:16 +0300 Subject: [PATCH 3/3] continue --- lib/ch/connection.ex | 71 ++++++++++++++++++++++++++--------------- test/ch/faults_test.exs | 6 ++-- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index dbab3f0..364d27c 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -15,6 +15,11 @@ defmodule Ch.Connection do scheme = String.to_existing_atom(opts[:scheme] || "http") address = opts[:hostname] || "localhost" port = opts[:port] || 8123 + + timeout = + opts[:timeout] || get_in(opts, [:transport_opts, :timeout]) || to_timeout(second: 15) + + deadline = timeout_to_deadline(timeout) mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do @@ -27,8 +32,9 @@ defmodule Ch.Connection do handshake = Query.build("select 1, version()") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) + recv_timeout = deadline_to_timeout(deadline) - case handle_execute(handshake, params, _opts = [], conn) do + case handle_execute(handshake, params, _opts = [recv_timeout: recv_timeout], conn) do {:ok, handshake, responses, conn} -> case DBConnection.Query.decode(handshake, responses, _opts = []) do %Result{rows: [[1, version]]} -> @@ -81,7 +87,7 @@ defmodule Ch.Connection do def ping(conn) do headers = [{"user-agent", @user_agent}] - case request(conn, "GET", "/ping", headers, _body = []) do + case request(conn, "GET", "/ping", headers, _body = [], _timeout = to_timeout(second: 5)) do {:ok, conn, _response} -> {:ok, conn} {:error, error, conn} -> {:disconnect, error, conn} {:disconnect, _error, _conn} = disconnect -> disconnect @@ -123,7 +129,7 @@ defmodule Ch.Connection do headers = headers(conn, extra_headers, opts) with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), - {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode) do + {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, _timeout = :infinity) do result = %Result{ command: command, columns: columns, @@ -140,26 +146,26 @@ defmodule Ch.Connection do end end - defp recv_declare(conn, decode) do + defp recv_declare(conn, decode, timeout) do acc = %{decode: decode, step: :status, buffer: [], headers: []} - recv_declare_continue(conn, acc) + recv_declare_continue(conn, acc, timeout_to_deadline(timeout)) end - defp recv_declare_continue(conn, acc) do - case HTTP.recv(conn, 0, :infinity) do + defp recv_declare_continue(conn, acc, deadline) do + case HTTP.recv(conn, 0, deadline_to_timeout(deadline)) do {:ok, conn, responses} -> case handle_recv_declare(responses, acc) do {:ok, columns, headers, reader} -> {:ok, conn, columns, headers, reader} {:more, acc} -> - recv_declare_continue(conn, acc) + recv_declare_continue(conn, acc, deadline) :error -> all_responses_result = case handle_all_responses(responses, []) do {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv_all(conn, acc) + {:more, acc} -> recv_all(conn, acc, deadline) end with {:ok, conn, responses} <- all_responses_result do @@ -297,7 +303,7 @@ defmodule Ch.Connection do {:ok, conn} -> case body do :eof -> - with {:ok, conn, responses} <- receive_full_response(conn) do + with {:ok, conn, responses} <- receive_full_response(conn, _timeout = :infinity) do {:ok, query, responses, conn} end @@ -318,9 +324,9 @@ defmodule Ch.Connection do result = if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body) + request_chunked(conn, "POST", path, headers, body, _timeout = :infinity) else - request(conn, "POST", path, headers, body) + request(conn, "POST", path, headers, body, _timeout = :infinity) end case result do @@ -335,8 +341,9 @@ defmodule Ch.Connection do path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) + timeout = opts[:recv_timeout] || :infinity - case request(conn, "POST", path, headers, body) do + case request(conn, "POST", path, headers, body, timeout) do {:ok, conn, responses} -> {:ok, query, responses, conn} {:error, _reason, _conn} = client_error -> client_error {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} @@ -355,24 +362,24 @@ defmodule Ch.Connection do @typep response :: Mint.Types.status() | Mint.Types.headers() | binary - @spec request(conn, binary, binary, Mint.Types.headers(), iodata) :: + @spec request(conn, binary, binary, Mint.Types.headers(), iodata, timeout) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - defp request(conn, method, path, headers, body) do + defp request(conn, method, path, headers, body, timeout) do with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do - receive_full_response(conn) + receive_full_response(conn, timeout) end end - @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t()) :: + @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), timeout) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream) do + def request_chunked(conn, method, path, headers, stream, timeout) do with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn) + do: receive_full_response(conn, timeout) end @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: @@ -401,12 +408,12 @@ defmodule Ch.Connection do end end - @spec receive_full_response(conn) :: + @spec receive_full_response(conn, timeout) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - defp receive_full_response(conn) do - with {:ok, conn, responses} <- recv_all(conn, []) do + defp receive_full_response(conn, timeout) do + with {:ok, conn, responses} <- recv_all(conn, [], timeout_to_deadline(timeout)) do case responses do [200, headers | _rest] -> conn = ensure_same_server(conn, headers) @@ -425,14 +432,14 @@ defmodule Ch.Connection do end end - @spec recv_all(conn, [response]) :: + @spec recv_all(conn, [response], non_neg_integer | nil) :: {:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn} - defp recv_all(conn, acc) do - case HTTP.recv(conn, 0, :infinity) do + defp recv_all(conn, acc, deadline) do + case HTTP.recv(conn, 0, deadline_to_timeout(deadline)) do {:ok, conn, responses} -> case handle_all_responses(responses, acc) do {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv_all(conn, acc) + {:more, acc} -> recv_all(conn, acc, deadline) end {:error, conn, reason, _responses} -> @@ -449,6 +456,18 @@ defmodule Ch.Connection do defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)} defp handle_all_responses([], acc), do: {:more, acc} + defp timeout_to_deadline(:infinity), do: nil + + defp timeout_to_deadline(timeout) do + System.monotonic_time(:millisecond) + timeout + end + + defp deadline_to_timeout(nil), do: :infinity + + defp deadline_to_timeout(deadline) do + max(0, deadline - System.monotonic_time(:millisecond)) + end + defp maybe_put_private(conn, _k, nil), do: conn defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v) diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index bcc7457..621f0cd 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -266,7 +266,7 @@ defmodule Ch.FaultsTest do } do log = capture_async_log(fn -> - {:ok, conn} = Ch.start_link(port: port, timeout: 100) + {:ok, conn} = Ch.start_link(port: port) # connect {:ok, mint} = :gen_tcp.accept(listen) @@ -277,6 +277,7 @@ defmodule Ch.FaultsTest do select = Task.async(fn -> + query_options = Keyword.put(query_options, :timeout, 100) Ch.query(conn, "select 1 + 1", [], query_options) end) @@ -298,7 +299,8 @@ defmodule Ch.FaultsTest do assert {:ok, %Ch.Result{rows: [[2]]}} = Task.await(select) end) - assert log =~ "disconnected: ** (Mint.TransportError) timeout" + assert log =~ + "timed out because it queued and checked out the connection for longer than 100ms" end test "reconnects after closed on response", ctx do