From c488399325fe380d34a8e338784ad5720963d680 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 8 Apr 2026 17:59:13 +0300 Subject: [PATCH 1/3] reset --- README.md | 294 +----------------------- bench/compress.exs | 1 + lib/ch.ex | 120 ---------- lib/ch/connection.ex | 527 ------------------------------------------- lib/ch/http.ex | 3 + lib/ch/pool.ex | 39 ++++ lib/ch/query.ex | 420 ---------------------------------- lib/ch/result.ex | 28 --- lib/ch/row_binary.ex | 4 +- lib/ch/stream.ex | 43 ---- lib/ch/telemetry.ex | 23 ++ mix.exs | 11 +- mix.lock | 7 +- test/support/help.ex | 203 +++++++++++++++++ test/test_helper.exs | 36 +-- 15 files changed, 287 insertions(+), 1472 deletions(-) create mode 100644 bench/compress.exs delete mode 100644 lib/ch/connection.ex create mode 100644 lib/ch/http.ex create mode 100644 lib/ch/pool.ex delete mode 100644 lib/ch/query.ex delete mode 100644 lib/ch/result.ex delete mode 100644 lib/ch/stream.ex create mode 100644 lib/ch/telemetry.ex create mode 100644 test/support/help.ex diff --git a/README.md b/README.md index 8f146d44..c0b33c89 100644 --- a/README.md +++ b/README.md @@ -7,309 +7,19 @@ Minimal HTTP [ClickHouse](https://clickhouse.com) client for Elixir. Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch) -### Key features - -- RowBinary -- Native query parameters -- Per query settings -- Minimal API - -Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82) - ## Installation ```elixir defp deps do [ - {:ch, "~> 0.7.0"} + {:ch, "~> 0.9.0"} ] end ``` ## Usage -#### Start [DBConnection](https://github.com/elixir-ecto/db_connection) pool - -```elixir -defaults = [ - scheme: "http", - hostname: "localhost", - port: 8123, - database: "default", - settings: [], - pool_size: 1, - timeout: :timer.seconds(15) -] - -# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have -# network access by default in the official Docker images -# see https://github.com/ClickHouse/ClickHouse/pull/75259 -{:ok, pid} = Ch.start_link(defaults) -``` - -#### Select rows - -```elixir -{:ok, pid} = Ch.start_link() - -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3") - -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT {$0:UInt8}", [3]) - -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3}) -``` - -Note on datetime encoding in query parameters: - -- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone -- `%DateTime{}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse - -#### Select rows (lots of params, reverse proxy) - -> [!NOTE] -> -> Support for multipart requests was added in `v0.6.2` - -For queries with many parameters the resulting URL can become too long for some reverse proxies, resulting in a `414 Request-URI Too Large` error. - -To avoid this, you can use the `multipart: true` option to send the query and parameters in the request body. - -```elixir -{:ok, pid} = Ch.start_link() - -# Moves parameters from the URL to a multipart/form-data body -%Ch.Result{rows: [[[1, 2, 3 | _rest]]]} = - Ch.query!(pid, "SELECT {ids:Array(UInt64)}", %{"ids" => Enum.to_list(1..10_000)}, multipart: true) -``` - -> [!NOTE] -> -> `multipart: true` is currently required on each individual query. Support for pool-wide configuration is planned for a future release. - -#### Insert rows - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)") - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1]) - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1}) - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2}) -``` - -#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) (efficient) - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -types = ["UInt64"] -# or -types = [Ch.Types.u64()] -# or -types = [:u64] - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types) -``` - -Note that RowBinary format encoding requires `:types` option to be provided. - -Similarly, you can use [RowBinaryWithNamesAndTypes](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) which would additionally do something like a type check. - -```elixir -sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes" -opts = [names: ["id"], types: ["UInt64"]] -rows = [[0], [1]] - -%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts) -``` - -#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false) -``` - -#### Insert rows as chunked RowBinary stream - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end) -chunked = Stream.chunk_every(stream, 100) -encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) -ten_encoded_chunks = Stream.take(encoded, 10) - -%Ch.Result{num_rows: 1000} = - Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false) -``` - -This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage. - -#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) - -```elixir -{:ok, pid} = Ch.start_link() - -settings = [async_insert: 1] - -%Ch.Result{rows: [["async_insert", "Bool", "0"]]} = - Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'") - -%Ch.Result{rows: [["async_insert", "Bool", "1"]]} = - Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings) -``` - -## Caveats - -#### NULL in RowBinary - -It's the same as in [ch-go](https://clickhouse.com/docs/en/integrations/go#nullable) - -> At insert time, Nil can be passed for both the normal and Nullable version of a column. For the former, the default value for the type will be persisted, e.g., an empty string for string. For the nullable version, a NULL value will be stored in ClickHouse. - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, """ -CREATE TABLE ch_nulls ( - a UInt8 NULL, - b UInt8 DEFAULT 10, - c UInt8 NOT NULL -) ENGINE Memory -""") - -types = ["Nullable(UInt8)", "UInt8", "UInt8"] -inserted_rows = [[nil, nil, nil]] -selected_rows = [[nil, 0, 0]] - -%Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types) - -%Ch.Result{rows: ^selected_rows} = - Ch.query!(pid, "SELECT * FROM ch_nulls") -``` - -Note that in this example `DEFAULT 10` is ignored and `0` (the default value for `UInt8`) is persisted instead. - -However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-functions/input) can be used as a workaround: - -```elixir -sql = """ -INSERT INTO ch_nulls - SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8') - FORMAT RowBinary\ -""" - -Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]) - -%Ch.Result{rows: [[0], [10]]} = - Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b") -``` - -#### UTF-8 in RowBinary - -When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types/string) columns non UTF-8 characters are replaced with `�` (U+FFFD). This behaviour is similar to [`toValidUTF8`](https://clickhouse.com/docs/en/sql-reference/functions/string-functions#tovalidutf8) and [JSON format.](https://clickhouse.com/docs/en/interfaces/formats#json) - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory") - -bin = "\x61\xF0\x80\x80\x80b" -utf8 = "a�b" - -%Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"]) - -%Ch.Result{rows: [[^utf8]]} = - Ch.query!(pid, "SELECT * FROM ch_utf8") - -%Ch.Result{rows: %{"data" => [[^utf8]]}} = - pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1) -``` - -To get raw binary from `String` columns use `:binary` type that skips UTF-8 checks. - -```elixir -%Ch.Result{rows: [[^bin]]} = - Ch.query!(pid, "SELECT * FROM ch_utf8", [], types: [:binary]) -``` - -#### Timezones in RowBinary - -Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database) - -```elixir -Mix.install([:ch, :tz]) - -:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase) - -{:ok, pid} = Ch.start_link() - -%Ch.Result{rows: [[~N[2023-04-25 17:45:09]]]} = - Ch.query!(pid, "SELECT CAST(now() as DateTime)") - -%Ch.Result{rows: [[~U[2023-04-25 17:45:11Z]]]} = - Ch.query!(pid, "SELECT CAST(now() as DateTime('UTC'))") - -%Ch.Result{rows: [[%DateTime{time_zone: "Asia/Taipei"} = taipei]]} = - Ch.query!(pid, "SELECT CAST(now() as DateTime('Asia/Taipei'))") - -"2023-04-26 01:45:12+08:00 CST Asia/Taipei" = to_string(taipei) -``` - -Encoding non-UTC datetimes works but might be slow due to timezone conversion: - -```elixir -Mix.install([:ch, :tz]) - -:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase) - -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE ch_datetimes(name String, datetime DateTime) ENGINE Memory") - -naive = NaiveDateTime.utc_now() -utc = DateTime.utc_now() -taipei = DateTime.shift_zone!(utc, "Asia/Taipei") - -rows = [["naive", naive], ["utc", utc], ["taipei", taipei]] - -Ch.query!(pid, "INSERT INTO ch_datetimes(name, datetime) FORMAT RowBinary", rows, types: ["String", "DateTime"]) - -%Ch.Result{ - rows: [ - ["naive", ~U[2024-12-21 05:24:40Z]], - ["utc", ~U[2024-12-21 05:24:40Z]], - ["taipei", ~U[2024-12-21 05:24:40Z]] - ] -} = - Ch.query!(pid, "SELECT name, CAST(datetime as DateTime('UTC')) FROM ch_datetimes") -``` +See guides and tests for examples. ## [Benchmarks](./bench) diff --git a/bench/compress.exs b/bench/compress.exs new file mode 100644 index 00000000..1115d332 --- /dev/null +++ b/bench/compress.exs @@ -0,0 +1 @@ +Benchee.run(%{"zstd" => fn input -> :zstd.compress(input) end}, inputs: %{"a" => "a"}) diff --git a/lib/ch.ex b/lib/ch.ex index 6f2f567a..8f9833a4 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -1,125 +1,5 @@ defmodule Ch do @moduledoc "Minimal HTTP ClickHouse client." - alias Ch.{Connection, Query, Result} - - @typedoc """ - Options shared by both connection startup and query execution. - - * `:database` - Database, defaults to `"default"` - * `:username` - Username - * `:password` - User password - * `:settings` - Keyword list of ClickHouse settings - * `:timeout` - HTTP request/receive timeout in milliseconds - """ - @type common_option :: - {:database, String.t()} - | {:username, String.t()} - | {:password, String.t()} - | {:settings, Keyword.t()} - | {:timeout, timeout} - - @typedoc """ - Options for starting the connection pool. - - Includes all keys from `t:common_option/0` and `t:DBConnection.start_option/0` plus: - - * `:scheme` - HTTP scheme, defaults to `"http"` - * `:hostname` - server hostname, defaults to `"localhost"` - * `:port` - HTTP port, defaults to `8123` - * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info - """ - @type start_option :: - common_option - | {:scheme, String.t()} - | {:hostname, String.t()} - | {:port, :inet.port_number()} - | {:transport_opts, [:gen_tcp.connect_option() | :ssl.tls_client_option()]} - | DBConnection.start_option() - - @doc """ - Start the connection pool process. - - See `t:start_option/0` for available options. - """ - @spec start_link([start_option]) :: GenServer.on_start() - def start_link(opts \\ []) do - DBConnection.start_link(Connection, opts) - end - - @doc """ - Returns a supervisor child specification for a connection pool. - - See `t:start_option/0` for supported options. - """ - @spec child_spec([start_option]) :: :supervisor.child_spec() - def child_spec(opts) do - DBConnection.child_spec(Connection, opts) - end - - @typedoc """ - Options for executing a query. - - Includes all keys from `t:common_option/0` and `t:DBConnection.connection_option/0` plus: - - * `:command` - Command tag for the query - * `:headers` - Custom HTTP headers for the request - * `:format` - Custom response format for the request - * `:decode` - Whether to automatically decode the response - * `:multipart` - Whether to send the query as multipart/form-data - """ - @type query_option :: - common_option - | {:command, Ch.Query.command()} - | {:headers, [{String.t(), String.t()}]} - | {:format, String.t()} - | {:types, [String.t() | atom | tuple]} - # TODO remove - | {:encode, boolean} - | {:decode, boolean} - | {:multipart, boolean} - | DBConnection.connection_option() - - @doc """ - Runs a query and returns the result as `{:ok, %Ch.Result{}}` or - `{:error, Exception.t()}` if there was a database error. - - See `t:query_option/0` for available options. - """ - @spec query(DBConnection.conn(), iodata, params, [query_option]) :: - {:ok, Result.t()} | {:error, Exception.t()} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() - def query(conn, statement, params \\ [], opts \\ []) do - query = Query.build(statement, opts) - - with {:ok, _query, result} <- DBConnection.execute(conn, query, params, opts) do - {:ok, result} - end - end - - @doc """ - Runs a query and returns the result or raises `Ch.Error` if - there was an error. See `query/4`. - """ - @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t() - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() - def query!(conn, statement, params \\ [], opts \\ []) do - query = Query.build(statement, opts) - DBConnection.execute!(conn, query, params, opts) - end - - @doc false - @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t() - def stream(conn, statement, params \\ [], opts \\ []) do - query = Query.build(statement, opts) - %Ch.Stream{conn: conn, query: query, params: params, opts: opts} - end - - # TODO drop - @doc false - @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any - def run(conn, f, opts \\ []) when is_function(f, 1) do - DBConnection.run(conn, f, opts) - end if Code.ensure_loaded?(Ecto.ParameterizedType) do @behaviour Ecto.ParameterizedType diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex deleted file mode 100644 index b53394ab..00000000 --- a/lib/ch/connection.ex +++ /dev/null @@ -1,527 +0,0 @@ -defmodule Ch.Connection do - @moduledoc false - use DBConnection - require Logger - alias Ch.{Error, Query, Result, RowBinary} - alias Mint.HTTP1, as: HTTP - - @user_agent "ch/" <> Mix.Project.config()[:version] - - @typep conn :: HTTP.t() - - @impl true - @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} - def connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) - - with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do - conn = - conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) - |> maybe_put_private(:database, opts[:database]) - |> maybe_put_private(:username, opts[:username]) - |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) - - handshake = Query.build("select 1, version()") - params = DBConnection.Query.encode(handshake, _params = [], _opts = []) - - case handle_execute(handshake, params, _opts = [], conn) do - {:ok, handshake, responses, conn} -> - case DBConnection.Query.decode(handshake, responses, _opts = []) do - %Result{rows: [[1, version]]} -> - conn = - if parse_version(version) >= parse_version("24.10") do - settings = - HTTP.get_private(conn, :settings, []) - |> Keyword.put_new(:input_format_binary_read_json_as_string, 1) - |> Keyword.put_new(:output_format_binary_write_json_as_string, 1) - - HTTP.put_private(conn, :settings, settings) - else - conn - end - - {:ok, conn} - - result -> - {:ok, _conn} = HTTP.close(conn) - reason = Error.exception("unexpected result for '#{handshake}': #{inspect(result)}") - {:error, reason} - end - - {:error, reason, conn} -> - {:ok, _conn} = HTTP.close(conn) - {:error, reason} - - {disconnect, reason, conn} when disconnect in [:disconnect, :disconnect_and_retry] -> - {:ok, _conn} = HTTP.close(conn) - {:error, reason} - end - end - catch - _kind, reason -> {:error, reason} - end - - defp parse_version(version) do - version - |> String.split(".") - |> Enum.flat_map(fn segment -> - case Integer.parse(segment) do - {int, _rest} -> [int] - :error -> [] - end - end) - end - - @impl true - @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} - def ping(conn) do - headers = [{"user-agent", @user_agent}] - - case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do - {:ok, conn, _response} -> {:ok, conn} - {:error, error, conn} -> {:disconnect, error, conn} - {:disconnect, _error, _conn} = disconnect -> disconnect - end - end - - @impl true - @spec checkout(conn) :: {:ok, conn} - def checkout(conn), do: {:ok, conn} - - # we "support" these four tx callbacks for Repo.checkout - # even though ClickHouse doesn't support txs - - @impl true - def handle_begin(_opts, conn), do: {:ok, %{}, conn} - @impl true - def handle_commit(_opts, conn), do: {:ok, %{}, conn} - @impl true - def handle_rollback(_opts, conn), do: {:ok, %{}, conn} - @impl true - def handle_status(_opts, conn), do: {:idle, conn} - - @impl true - def handle_prepare(_query, _opts, conn) do - {:error, Error.exception("prepared statements are not supported"), conn} - end - - @impl true - def handle_close(_query, _opts, conn) do - {:error, Error.exception("prepared statements are not supported"), conn} - end - - @impl true - def handle_declare(query, params, opts, conn) do - %Query{command: command, decode: decode} = query - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - timeout = timeout(conn, opts) - - with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), - {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do - result = %Result{ - command: command, - columns: columns, - rows: [], - num_rows: 0, - headers: headers, - data: [] - } - - {:ok, query, result, {conn, reader}} - else - {:error, _reason, _conn} = client_error -> client_error - {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} - end - end - - defp recv_declare(conn, decode, timeout) do - acc = %{decode: decode, step: :status, buffer: [], headers: []} - recv_declare_continue(conn, acc, timeout) - end - - defp recv_declare_continue(conn, acc, timeout) do - case HTTP.recv(conn, 0, timeout) do - {:ok, conn, responses} -> - case handle_recv_declare(responses, acc) do - {:ok, columns, headers, reader} -> - {:ok, conn, columns, headers, reader} - - {:more, acc} -> - recv_declare_continue(conn, acc, timeout) - - :error -> - all_responses_result = - case handle_all_responses(responses, []) do - {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv_all(conn, acc, timeout) - end - - with {:ok, conn, responses} <- all_responses_result do - [_status, headers | data] = responses - message = IO.iodata_to_binary(data) - - code = - if code = get_header(headers, "x-clickhouse-exception-code") do - String.to_integer(code) - end - - {:error, Error.exception(code: code, message: message), conn} - end - end - - {:error, conn, error, _responses} -> - {:disconnect, error, conn} - end - end - - defp handle_recv_declare([{:status, _ref, status} | responses], %{step: :status} = acc) do - case status do - 200 -> handle_recv_declare(responses, %{acc | step: :headers}) - _other -> :error - end - end - - defp handle_recv_declare([{:headers, _ref, headers} | responses], %{step: :headers} = acc) do - with %{decode: true} <- acc, - "RowBinaryWithNamesAndTypes" <- get_header(headers, "x-clickhouse-format") do - handle_recv_declare(responses, %{acc | headers: headers, step: :columns}) - else - _ -> - reader = %{decode: false, responses: responses} - {:ok, _columns = nil, headers, reader} - end - end - - defp handle_recv_declare([{:data, _ref, data} | responses], %{step: :columns} = acc) do - buffer = maybe_concat_buffer(acc.buffer, data) - - case RowBinary.decode_header(buffer) do - {:ok, names, types, buffer} -> - reader = %{buffer: buffer, types: types, state: nil, responses: responses} - {:ok, names, acc.headers, reader} - - :more -> - handle_recv_declare(responses, %{acc | buffer: buffer}) - end - end - - defp handle_recv_declare([], acc), do: {:more, acc} - - @compile inline: [maybe_concat_buffer: 2] - defp maybe_concat_buffer("", data), do: data - defp maybe_concat_buffer(buffer, data) when is_binary(buffer), do: buffer <> data - defp maybe_concat_buffer([], data), do: data - - @impl true - def handle_fetch(query, %Result{} = result, opts, {conn, reader}) do - case reader do - %{responses: []} -> - handle_fetch_recv(query, result, opts, conn, reader) - - %{decode: false, responses: responses} -> - case responses do - [{:data, _ref, data} | responses] -> - result = %Result{result | data: data} - reader = %{reader | responses: responses} - {:cont, result, {conn, reader}} - - [{:done, _ref}] -> - reader = %{reader | responses: []} - {:halt, result, {conn, reader}} - end - - %{buffer: buffer, types: types, state: state, responses: responses} -> - case responses do - [{:data, _ref, data} | responses] -> - buffer = maybe_concat_buffer(buffer, data) - {rows, buffer, state} = RowBinary.decode_rows_continue(buffer, types, state) - result = %Result{result | data: data, rows: rows, num_rows: length(rows)} - reader = %{reader | buffer: buffer, state: state, responses: responses} - {:cont, result, {conn, reader}} - - [{:done, _ref}] -> - reader = %{reader | responses: []} - {:halt, result, {conn, reader}} - end - end - end - - defp handle_fetch_recv(query, result, opts, conn, reader) do - timeout = timeout(conn, opts) - - case HTTP.recv(conn, 0, timeout) do - {:ok, conn, responses} -> - reader = %{reader | responses: responses} - handle_fetch(query, result, opts, {conn, reader}) - - {:error, conn, reason, _responses} -> - {:disconnect, reason, conn} - end - end - - @impl true - def handle_deallocate(_query, %Result{} = result, _opts, {conn, _reader}) do - case HTTP.open_request_count(conn) do - 0 -> - {:ok, %{result | data: []}, conn} - - 1 -> - error = - Error.exception("stopping stream before receiving full response by closing connection") - - {:disconnect, error, conn} - end - end - - @impl true - def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - - with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do - case HTTP.stream_request_body(conn, ref, body) do - {:ok, conn} -> {:ok, query, ref, conn} - {:error, conn, reason} -> {:disconnect_and_retry, reason, conn} - end - end - end - - def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do - case HTTP.stream_request_body(conn, ref, body) do - {:ok, conn} -> - case body do - :eof -> - with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do - {:ok, query, responses, conn} - end - - _other -> - {:ok, query, ref, conn} - end - - {:error, conn, reason} -> - {:disconnect_and_retry, reason, conn} - end - end - - def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - - result = - if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) - else - request(conn, "POST", path, headers, body, opts) - end - - case result do - {:ok, conn, responses} -> {:ok, query, responses, conn} - {:error, _reason, _conn} = client_error -> client_error - {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} - end - end - - def handle_execute(query, params, opts, conn) do - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - - case request(conn, "POST", path, headers, body, opts) do - {:ok, conn, responses} -> {:ok, query, responses, conn} - {:error, _reason, _conn} = client_error -> client_error - {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} - end - end - - @impl true - def disconnect(error, {conn, _reader}) do - disconnect(error, conn) - end - - def disconnect(_error, conn) do - {:ok = ok, _conn} = HTTP.close(conn) - ok - end - - @typep response :: Mint.Types.status() | Mint.Types.headers() | binary - - @spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - defp request(conn, method, path, headers, body, opts) do - with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do - receive_full_response(conn, timeout(conn, opts)) - end - end - - @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), - {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) - end - - @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: - {:ok, conn} | {:disconnect, Mint.Types.error(), conn} - defp stream_body(conn, ref, stream) do - result = - stream - |> Stream.concat([:eof]) - |> Enum.reduce_while({:ok, conn}, fn - chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)} - _chunk, {:error, _conn, _reason} = error -> {:halt, error} - end) - - case result do - {:ok, _conn} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - - # stacktrace is a bit cleaner with this function inlined - @compile inline: [send_request: 5] - defp send_request(conn, method, path, headers, body) do - case HTTP.request(conn, method, path, headers, body) do - {:ok, _conn, _ref} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - - @spec receive_full_response(conn, timeout) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - defp receive_full_response(conn, timeout) do - with {:ok, conn, responses} <- recv_all(conn, [], timeout) do - case responses do - [200, headers | _rest] -> - conn = ensure_same_server(conn, headers) - {:ok, conn, responses} - - [_status, headers | data] -> - message = IO.iodata_to_binary(data) - - code = - if code = get_header(headers, "x-clickhouse-exception-code") do - String.to_integer(code) - end - - {:error, Error.exception(code: code, message: message), conn} - end - end - end - - @spec recv_all(conn, [response], timeout()) :: - {:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn} - defp recv_all(conn, acc, timeout) do - case HTTP.recv(conn, 0, timeout) do - {:ok, conn, responses} -> - case handle_all_responses(responses, acc) do - {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv_all(conn, acc, timeout) - end - - {:error, conn, reason, _responses} -> - {:disconnect, reason, conn} - end - end - - for tag <- [:data, :status, :headers] do - defp handle_all_responses([{unquote(tag), _ref, data} | rest], acc) do - handle_all_responses(rest, [data | acc]) - end - end - - defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)} - defp handle_all_responses([], acc), do: {:more, acc} - - defp maybe_put_private(conn, _k, nil), do: conn - defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v) - - defp timeout(conn), do: HTTP.get_private(conn, :timeout) - defp timeout(conn, opts), do: Keyword.get(opts, :timeout) || timeout(conn) - - defp settings(conn, opts) do - default_settings = HTTP.get_private(conn, :settings, []) - opts_settings = Keyword.get(opts, :settings, []) - Keyword.merge(default_settings, opts_settings) - end - - defp headers(conn, extra_headers, opts) do - extra_headers - |> maybe_put_new_header("x-clickhouse-user", get_opts_or_private(conn, opts, :username)) - |> maybe_put_new_header("x-clickhouse-key", get_opts_or_private(conn, opts, :password)) - |> maybe_put_new_header("x-clickhouse-database", get_opts_or_private(conn, opts, :database)) - |> maybe_put_new_header("user-agent", @user_agent) - end - - defp get_opts_or_private(conn, opts, key) do - Keyword.get(opts, key) || HTTP.get_private(conn, key) - end - - defp maybe_put_new_header(headers, _name, _no_value = nil), do: headers - - defp maybe_put_new_header(headers, name, value) do - if List.keymember?(headers, name, 0) do - headers - else - [{name, value} | headers] - end - end - - defp get_header(headers, key) do - case List.keyfind(headers, key, 0) do - {_, value} -> value - nil = not_found -> not_found - end - end - - defp path(conn, query_params, opts) do - settings = settings(conn, opts) - "/?" <> URI.encode_query(settings ++ query_params) - end - - @server_display_name_key :server_display_name - - @spec ensure_same_server(conn, Mint.Types.headers()) :: conn - defp ensure_same_server(conn, headers) do - expected_name = HTTP.get_private(conn, @server_display_name_key) - actual_name = get_header(headers, "x-clickhouse-server-display-name") - - cond do - expected_name && actual_name -> - unless actual_name == expected_name do - Logger.warning( - "Server mismatch detected. Expected #{inspect(expected_name)} but got #{inspect(actual_name)}!" <> - " Connection pooling might be unstable." - ) - end - - conn - - actual_name -> - HTTP.put_private(conn, @server_display_name_key, actual_name) - - true -> - conn - end - end -end diff --git a/lib/ch/http.ex b/lib/ch/http.ex new file mode 100644 index 00000000..6645ca60 --- /dev/null +++ b/lib/ch/http.ex @@ -0,0 +1,3 @@ +defmodule Ch.HTTP do + @moduledoc false +end diff --git a/lib/ch/pool.ex b/lib/ch/pool.ex new file mode 100644 index 00000000..2211bcdb --- /dev/null +++ b/lib/ch/pool.ex @@ -0,0 +1,39 @@ +defmodule Ch.Pool do + # @moduledoc false + # @behaviour NimblePool + + # def start_link(options) do + # NimbleOptions.validate!() + # NimblePool.start_link(worker: {__MODULE__, options}) + # end + + # def query(pool, query, options \\ []) do + # {timeout, options} = Keyword.pop(options, :checkout_timeout, to_timeout(second: 5)) + # req = Ch.HTTP.build(query, options) + + # NimblePool.checkout!( + # pool, + # :query, + # fn {pid, _ref}, conn -> + # with {:ok, conn} <- ensure_connected(conn) do + # case Ch.HTTP.request(conn, req) do + # {:ok, conn, result} -> {{:ok, result}, {:ok, conn}} + # {:error, reason, conn} -> {{:error, reason}, {:ok, conn}} + # {:disconnect, reason, conn} -> {{:error, reason}, {:remove, conn, reason}} + # end + # end + # end, + # timeout + # ) + # end + + # @impl NimblePool + # def init_pool(init_arg) do + # pool_state = %{} + # {:ok, pool_state} + # end + + # @impl NimblePool + # def init_worker(pool_state) do + # end +end diff --git a/lib/ch/query.ex b/lib/ch/query.ex deleted file mode 100644 index 37e05966..00000000 --- a/lib/ch/query.ex +++ /dev/null @@ -1,420 +0,0 @@ -defmodule Ch.Query do - @moduledoc "Query struct wrapping the SQL statement." - defstruct [:statement, :command, :encode, :decode, :multipart] - - @typedoc """ - The Query struct. - - ## Fields - - * `:statement` - The SQL statement to be executed (as `t:iodata/0`). - * `:command` - The detected or enforced SQL command type (e.g., `:select`, `:insert`). - * `:encode` - Whether to encode parameters (defaults to `true`). - * `:decode` - Whether to decode the response (defaults to `true`). - * `:multipart` - Whether to use `multipart/form-data` for the request (defaults to `false`). - """ - @type t :: %__MODULE__{ - statement: iodata, - command: command, - encode: boolean, - decode: boolean, - multipart: boolean - } - - @doc false - @spec build(iodata, [Ch.query_option()]) :: t - def build(statement, opts \\ []) do - command = Keyword.get(opts, :command) || extract_command(statement) - encode = Keyword.get(opts, :encode, true) - decode = Keyword.get(opts, :decode, true) - multipart = Keyword.get(opts, :multipart, false) - - %__MODULE__{ - statement: statement, - command: command, - encode: encode, - decode: decode, - multipart: multipart - } - end - - statements = [ - {"SELECT", :select}, - {"INSERT", :insert}, - {"CREATE", :create}, - {"ALTER", :alter}, - {"DELETE", :delete}, - {"SYSTEM", :system}, - {"SHOW", :show}, - # as of ClickHouse 24.11, WITH is only allowed in SELECT - # https://clickhouse.com/docs/en/sql-reference/statements/select/with/ - {"WITH", :select}, - {"GRANT", :grant}, - {"EXPLAIN", :explain}, - {"REVOKE", :revoke}, - {"UPDATE", :update}, - {"ATTACH", :attach}, - {"CHECK", :check}, - {"DESCRIBE", :describe}, - {"DETACH", :detach}, - {"DROP", :drop}, - {"EXISTS", :exists}, - {"KILL", :kill}, - {"OPTIMIZE", :optimize}, - {"RENAME", :rename}, - {"EXCHANGE", :exchange}, - {"SET", :set}, - {"TRUNCATE", :truncate}, - {"USE", :use}, - {"WATCH", :watch}, - {"MOVE", :move}, - {"UNDROP", :undrop} - ] - - command_union = - statements - |> Enum.map(fn {_, command} -> command end) - |> Enum.reduce(&{:|, [], [&1, &2]}) - - @typedoc """ - Atom representing the type of SQL command. - - Derived automatically from the start of the SQL statement (e.g., `"SELECT ..."` -> `:select`), - or provided explicitly via options. - """ - @type command :: unquote(command_union) - - defp extract_command(statement) - - for {statement, command} <- statements do - defp extract_command(unquote(statement) <> _), do: unquote(command) - defp extract_command(unquote(String.downcase(statement)) <> _), do: unquote(command) - end - - defp extract_command(<>) when whitespace in [?\s, ?\t, ?\n] do - extract_command(rest) - end - - defp extract_command([first_segment | _] = statement) do - extract_command(first_segment) || extract_command(IO.iodata_to_binary(statement)) - end - - defp extract_command(_other), do: nil -end - -defimpl DBConnection.Query, for: Ch.Query do - @dialyzer :no_improper_lists - alias Ch.{Query, Result, RowBinary} - - @spec parse(Query.t(), [Ch.query_option()]) :: Query.t() - def parse(query, _opts), do: query - - @spec describe(Query.t(), [Ch.query_option()]) :: Query.t() - def describe(query, _opts), do: query - - # stream: insert init - @spec encode(Query.t(), {:stream, term}, [Ch.query_option()]) :: - {:stream, {[{String.t(), String.t()}], Mint.Types.headers(), iodata}} - def encode(query, {:stream, params}, opts) do - {:stream, encode(query, params, opts)} - end - - # stream: insert data chunk - @spec encode(Query.t(), {:stream, Mint.Types.request_ref(), iodata | :eof}, [Ch.query_option()]) :: - {:stream, Mint.Types.request_ref(), iodata | :eof} - def encode(_query, {:stream, ref, data}, _opts) do - {:stream, ref, data} - end - - @spec encode(Query.t(), params, [Ch.query_option()]) :: - {query_params, Mint.Types.headers(), body} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(), - query_params: [{String.t(), String.t()}], - body: iodata | Enumerable.t() - - def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do - body = - case data do - _ when is_list(data) or is_binary(data) -> [statement, ?\n | data] - _ -> Stream.concat([[statement, ?\n]], data) - end - - {_query_params = [], headers(opts), body} - end - - def encode(%Query{command: :insert, statement: statement}, params, opts) do - cond do - names = Keyword.get(opts, :names) -> - types = Keyword.fetch!(opts, :types) - header = RowBinary.encode_names_and_types(names, types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n, header | data]} - - format_row_binary?(statement) -> - types = Keyword.fetch!(opts, :types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n | data]} - - true -> - {query_params(params), headers(opts), statement} - end - end - - def encode(%Query{multipart: true, statement: statement}, params, opts) do - types = Keyword.get(opts, :types) - default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" - format = Keyword.get(opts, :format) || default_format - - boundary = "ChFormBoundary" <> Base.url_encode64(:crypto.strong_rand_bytes(24)) - content_type = "multipart/form-data; boundary=\"#{boundary}\"" - enc_boundary = "--#{boundary}\r\n" - multipart = multipart_params(params, enc_boundary) - multipart = add_multipart_part(multipart, "query", statement, enc_boundary) - multipart = [multipart | "--#{boundary}--\r\n"] - - {_no_query_params = [], - [{"x-clickhouse-format", format}, {"content-type", content_type} | headers(opts)], multipart} - end - - def encode(%Query{statement: statement}, params, opts) do - types = Keyword.get(opts, :types) - default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" - format = Keyword.get(opts, :format) || default_format - {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement} - end - - defp multipart_params(params, boundary) when is_map(params) do - multipart_named_params(Map.to_list(params), boundary, []) - end - - defp multipart_params(params, boundary) when is_list(params) do - multipart_positional_params(params, 0, boundary, []) - end - - defp multipart_named_params([{name, value} | params], boundary, acc) do - acc = - add_multipart_part( - acc, - "param_" <> URI.encode_www_form(name), - encode_param(value), - boundary - ) - - multipart_named_params(params, boundary, acc) - end - - defp multipart_named_params([], _boundary, acc), do: acc - - defp multipart_positional_params([value | params], idx, boundary, acc) do - acc = - add_multipart_part( - acc, - "param_$" <> Integer.to_string(idx), - encode_param(value), - boundary - ) - - multipart_positional_params(params, idx + 1, boundary, acc) - end - - defp multipart_positional_params([], _idx, _boundary, acc), do: acc - - @compile inline: [add_multipart_part: 4] - defp add_multipart_part(multipart, name, value, boundary) do - part = [ - boundary, - "content-disposition: form-data; name=\"", - name, - "\"\r\n\r\n", - value, - "\r\n" - ] - - case multipart do - [] -> part - _ -> [multipart | part] - end - end - - defp format_row_binary?(statement) when is_binary(statement) do - statement |> String.trim_trailing() |> String.ends_with?("RowBinary") - end - - defp format_row_binary?(statement) when is_list(statement) do - statement - |> IO.iodata_to_binary() - |> format_row_binary?() - end - - # stream: select result - @spec decode(Query.t(), result, [Ch.query_option()]) :: result when result: Result.t() - def decode(_query, %Result{} = result, _opts), do: result - # stream: insert result - @spec decode(Query.t(), ref, [Ch.query_option()]) :: ref when ref: Mint.Types.request_ref() - def decode(_query, ref, _opts) when is_reference(ref), do: ref - - @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t() - when response: Mint.Types.status() | Mint.Types.headers() | binary - def decode(%Query{command: :insert}, responses, _opts) do - [_status, headers | _data] = responses - - num_rows = - if summary = get_header(headers, "x-clickhouse-summary") do - summary = Jason.decode!(summary) - - if written_rows = Map.get(summary, "written_rows") do - String.to_integer(written_rows) - end - end - - %Result{num_rows: num_rows, rows: nil, command: :insert, headers: headers} - end - - def decode(%Query{decode: false, command: command}, responses, _opts) when is_list(responses) do - # TODO potentially fails on x-progress-headers - [_status, headers | data] = responses - %Result{rows: data, data: data, command: command, headers: headers} - end - - def decode(%Query{command: command}, responses, opts) when is_list(responses) do - # TODO potentially fails on x-progress-headers - [_status, headers | data] = responses - - case get_header(headers, "x-clickhouse-format") do - "RowBinary" -> - types = Keyword.fetch!(opts, :types) - rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows(types) - %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} - - "RowBinaryWithNamesAndTypes" -> - [names | rows] = data |> IO.iodata_to_binary() |> RowBinary.decode_names_and_rows() - - %Result{ - num_rows: length(rows), - columns: names, - rows: rows, - command: command, - headers: headers - } - - _other -> - %Result{rows: data, data: data, command: command, headers: headers} - end - end - - defp get_header(headers, key) do - case List.keyfind(headers, key, 0) do - {_, value} -> value - nil = not_found -> not_found - end - end - - defp query_params(params) when is_map(params) do - Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) - end - - defp query_params(params) when is_list(params) do - params - |> Enum.with_index() - |> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode_param(v)} end) - end - - defp encode_param(n) when is_integer(n), do: Integer.to_string(n) - defp encode_param(f) when is_float(f), do: Float.to_string(f) - - # TODO possibly speed up - # For more info see - # https://clickhouse.com/docs/en/interfaces/http#tabs-in-url-parameters - # "escaped" format is the same as https://clickhouse.com/docs/en/interfaces/formats#tabseparated-data-formatting - defp encode_param(b) when is_binary(b) do - escape_param([{"\\", "\\\\"}, {"\t", "\\\t"}, {"\n", "\\\n"}], b) - end - - defp encode_param(b) when is_boolean(b), do: Atom.to_string(b) - defp encode_param(nil), do: "\\N" - defp encode_param(%Decimal{} = d), do: Decimal.to_string(d, :normal) - defp encode_param(%Date{} = date), do: Date.to_iso8601(date) - defp encode_param(%NaiveDateTime{} = naive), do: NaiveDateTime.to_iso8601(naive) - defp encode_param(%Time{} = time), do: Time.to_iso8601(time) - - defp encode_param(%DateTime{microsecond: microsecond} = dt) do - dt = DateTime.shift_zone!(dt, "Etc/UTC") - - case microsecond do - {val, precision} when val > 0 and precision > 0 -> - size = round(:math.pow(10, precision)) - unix = DateTime.to_unix(dt, size) - seconds = div(unix, size) - fractional = rem(unix, size) - - IO.iodata_to_binary([ - Integer.to_string(seconds), - ?., - String.pad_leading(Integer.to_string(fractional), precision, "0") - ]) - - _ -> - dt |> DateTime.to_unix(:second) |> Integer.to_string() - end - end - - defp encode_param(tuple) when is_tuple(tuple) do - IO.iodata_to_binary([?(, encode_array_params(Tuple.to_list(tuple)), ?)]) - end - - defp encode_param(a) when is_list(a) do - IO.iodata_to_binary([?[, encode_array_params(a), ?]]) - end - - defp encode_param(m) when is_map(m) do - IO.iodata_to_binary([?{, encode_map_params(Map.to_list(m)), ?}]) - end - - defp encode_array_params([last]), do: encode_array_param(last) - - defp encode_array_params([s | rest]) do - [encode_array_param(s), ?, | encode_array_params(rest)] - end - - defp encode_array_params([] = empty), do: empty - - defp encode_map_params([last]), do: encode_map_param(last) - - defp encode_map_params([kv | rest]) do - [encode_map_param(kv), ?, | encode_map_params(rest)] - end - - defp encode_map_params([] = empty), do: empty - - defp encode_array_param(s) when is_binary(s) do - [?', escape_param([{"'", "''"}, {"\\", "\\\\"}], s), ?'] - end - - defp encode_array_param(nil), do: "null" - - defp encode_array_param(%s{} = param) when s in [Date, NaiveDateTime] do - [?', encode_param(param), ?'] - end - - defp encode_array_param(v), do: encode_param(v) - - defp encode_map_param({k, v}) do - [encode_array_param(k), ?:, encode_array_param(v)] - end - - defp escape_param([{pattern, replacement} | escapes], param) do - param = String.replace(param, pattern, replacement) - escape_param(escapes, param) - end - - defp escape_param([], param), do: param - - @spec headers(Keyword.t()) :: Mint.Types.headers() - defp headers(opts), do: Keyword.get(opts, :headers, []) -end - -defimpl String.Chars, for: Ch.Query do - def to_string(%{statement: statement}) do - IO.iodata_to_binary(statement) - end -end diff --git a/lib/ch/result.ex b/lib/ch/result.ex deleted file mode 100644 index 8d4f0868..00000000 --- a/lib/ch/result.ex +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Ch.Result do - @moduledoc """ - Result struct returned from any successful query. - """ - - defstruct [:command, :num_rows, :columns, :rows, :headers, :data] - - @typedoc """ - The Result struct. - - ## Fields - - * `:command` - An atom of the query command, for example: `:select`, `:insert` - * `:columns` - A list of column names - * `:rows` - A list of lists (each inner list corresponding to a row, each element in the inner list corresponds to a column) - * `:num_rows` - The number of fetched or affected rows - * `:headers` - The HTTP response headers - * `:data` - The raw iodata from the response - """ - @type t :: %__MODULE__{ - command: Ch.Query.command() | nil, - num_rows: non_neg_integer | nil, - columns: [String.t()] | nil, - rows: [[term]] | iodata | nil, - headers: Mint.Types.headers(), - data: iodata - } -end diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index ae6c5346..a59da1ce 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -197,7 +197,7 @@ defmodule Ch.RowBinary do # assuming it can be sent as text and not "native" binary JSON # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]` # TODO - encode(:string, Jason.encode_to_iodata!(json)) + encode(:string, JSON.encode_to_iodata!(json)) end def encode({:fixed_string, size}, str) when byte_size(str) == size do @@ -886,7 +886,7 @@ defmodule Ch.RowBinary do rows, types ) do - decode_rows(types_rest, bin, [Jason.decode!(s) | row], rows, types) + decode_rows(types_rest, bin, [JSON.decode!(s) | row], rows, types) end end diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex deleted file mode 100644 index 9ec8b5fd..00000000 --- a/lib/ch/stream.ex +++ /dev/null @@ -1,43 +0,0 @@ -defmodule Ch.Stream do - @moduledoc false - - @derive {Inspect, only: []} - defstruct [:conn, :ref, :query, :params, :opts] - - @type t :: %__MODULE__{ - conn: DBConnection.conn(), - ref: Mint.Types.request_ref() | nil, - query: Ch.Query.t(), - params: term, - opts: [Ch.query_option()] - } - - defimpl Enumerable do - def reduce(stream, acc, fun) do - %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream - stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} - DBConnection.reduce(stream, acc, fun) - end - - def member?(_, _), do: {:error, __MODULE__} - def count(_), do: {:error, __MODULE__} - def slice(_), do: {:error, __MODULE__} - end - - defimpl Collectable do - def into(stream) do - %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream - ref = DBConnection.execute!(conn, query, {:stream, params}, opts) - {%{stream | ref: ref}, &collect/2} - end - - defp collect(%{conn: conn, query: query, ref: ref} = stream, {:cont, data}) do - ^ref = DBConnection.execute!(conn, query, {:stream, ref, data}) - stream - end - - defp collect(%{conn: conn, query: query, ref: ref}, eof) when eof in [:halt, :done] do - DBConnection.execute!(conn, query, {:stream, ref, :eof}) - end - end -end diff --git a/lib/ch/telemetry.ex b/lib/ch/telemetry.ex new file mode 100644 index 00000000..a19b4bcd --- /dev/null +++ b/lib/ch/telemetry.ex @@ -0,0 +1,23 @@ +defmodule Ch.Telemetry do + @moduledoc """ + TODO + """ + + @default_handler_id "ch-default-handler" + + def attach_default_handler do + :telemetry.attach_many(@default_handler_id, [[]], &__MODULE__.handle_event/4, _no_config = []) + end + + def detach_default_handler do + :telemetry.detach(@default_handler_id) + end + + @doc false + def handle_event([:ch | event], _measurements, metadata, _config) do + case {event, metadata} do + {[:connect, _stop_or_exception], %{kind: _, reason: _}} -> + :ok + end + end +end diff --git a/mix.exs b/mix.exs index 92a19e1c..379dddb9 100644 --- a/mix.exs +++ b/mix.exs @@ -2,13 +2,13 @@ defmodule Ch.MixProject do use Mix.Project @source_url "https://github.com/plausible/ch" - @version "0.7.1" + @version "0.9.0" def project do [ app: :ch, version: @version, - elixir: "~> 1.15", + elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), deps: deps(), name: "Ch", @@ -31,7 +31,6 @@ defmodule Ch.MixProject do defp elixirc_paths(:test), do: ["lib", "test/support"] defp elixirc_paths(_env), do: ["lib"] - defp extra_applications(:test), do: [:inets, :tools] defp extra_applications(:dev), do: [:tools] defp extra_applications(_env), do: [] @@ -39,8 +38,10 @@ defmodule Ch.MixProject do defp deps do [ {:mint, "~> 1.0"}, - {:db_connection, "~> 2.9.0"}, - {:jason, "~> 1.0"}, + {:nimble_pool, "~> 1.1"}, + {:nimble_options, "~> 1.1"}, + {:telemetry, "~> 1.4"}, + {:telemetry_docs, "~> 0.1.0"}, {:decimal, "~> 2.0"}, {:ecto, "~> 3.13.0", optional: true}, {:benchee, "~> 1.0", only: [:bench]}, diff --git a/mix.lock b/mix.lock index 92957b86..a3009f33 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,5 @@ %{ "benchee": {:hex, :benchee, "1.5.0", "4d812c31d54b0ec0167e91278e7de3f596324a78a096fd3d0bea68bb0c513b10", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.1", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "5b075393aea81b8ae74eadd1c28b1d87e8a63696c649d8293db7c4df3eb67535"}, - "db_connection": {:hex, :db_connection, "2.9.0", "a6a97c5c958a2d7091a58a9be40caf41ab496b0701d21e1d1abff3fa27a7f371", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "17d502eacaf61829db98facf6f20808ed33da6ccf495354a41e64fe42f9c509c"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, @@ -9,13 +8,15 @@ "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, "ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, - "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, - "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"}, + "telemetry_docs": {:hex, :telemetry_docs, "0.1.0", "9c95cdfcf34960b6533ff19d56a2ba2f1980aad7f2c3191d3e7ec91a85e6f74c", [:mix], [{:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}], "hexpm", "e59ac534d29437d75680a1af9a0d7c67569ffe146c35e47809b9fdabd5e04dc1"}, "tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"}, } diff --git a/test/support/help.ex b/test/support/help.ex new file mode 100644 index 00000000..e169c2b9 --- /dev/null +++ b/test/support/help.ex @@ -0,0 +1,203 @@ +defmodule Help do + @moduledoc false + alias Mint.HTTP1, as: HTTP + + def sql(query, params \\ %{}, options \\ []) do + case connect(options) do + {:ok, conn} -> + case request(conn, query, params, options) do + {:ok, conn, response} -> + close(conn) + response + + {:error, reason, conn} -> + close(conn) + raise reason + + {:disconnect, reason, conn} -> + close(conn) + raise reason + end + + {:error, reason} -> + raise reason + end + end + + defp connect(options) do + scheme = Keyword.get(options, :scheme, :http) + host = Keyword.get(options, :host, "localhost") + port = Keyword.get(options, :port, 8123) + HTTP.connect(scheme, host, port, mode: :passive) + end + + @user_agent "ch/" <> Mix.Project.config()[:version] + + defp request(conn, query, params, options) do + timeout = Keyword.get(options, :timeout, to_timeout(second: 5)) + deadline = System.monotonic_time(:millisecond) + timeout + + settings = Keyword.get(options, :settings, []) + params = Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) + path = "/?" <> URI.encode_query(settings ++ params) + + headers = + Keyword.get(options, :headers, []) + |> maybe_put_new_header("x-clickhouse-user", Keyword.get(opts, :username)) + |> maybe_put_new_header("x-clickhouse-key", Keyword.get(opts, :password)) + |> maybe_put_new_header("x-clickhouse-database", Keyword.get(opts, :database)) + |> maybe_put_new_header("user-agent", @user_agent) + + with {:ok, conn, _ref} <- HTTP.request(conn, "POST", path, headers, query) do + receive_response(conn, deadline) + end + end + + defp receive_response(conn, deadline) do + with {:ok, conn, responses} <- recv_all(conn, [], deadline) do + case responses do + [200, _headers | data] -> + {:ok, conn, IO.iodata_to_binary(data)} + + [_status, headers | data] -> + message = IO.iodata_to_binary(data) + + code = + if code = get_header(headers, "x-clickhouse-exception-code") do + String.to_integer(code) + end + + {:error, Ch.Error.exception(code: code, message: message), conn} + end + end + end + + defp recv_all(conn, acc, deadline) do + timeout = max(0, deadline - System.monotonic_time(:millisecond)) + + case HTTP.recv(conn, 0, timeout) do + {:ok, conn, responses} -> + case handle_all_responses(responses, acc) do + {:ok, responses} -> {:ok, conn, responses} + {:more, acc} -> recv_all(conn, acc, deadline) + end + + {:error, conn, reason, _responses} -> + {:disconnect, reason, conn} + end + end + + for tag <- [:data, :status, :headers] do + defp handle_all_responses([{unquote(tag), _ref, data} | rest], acc) do + handle_all_responses(rest, [data | acc]) + end + end + + defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)} + defp handle_all_responses([], acc), do: {:more, acc} + + defp close(conn) do + HTTP.close(conn) + :ok + end + + defp maybe_put_new_header(headers, _name, _no_value = nil), do: headers + + defp maybe_put_new_header(headers, name, value) do + if List.keymember?(headers, name, 0) do + headers + else + [{name, value} | headers] + end + end + + defp encode_param(n) when is_integer(n), do: Integer.to_string(n) + defp encode_param(f) when is_float(f), do: Float.to_string(f) + + # TODO possibly speed up + # For more info see + # https://clickhouse.com/docs/en/interfaces/http#tabs-in-url-parameters + # "escaped" format is the same as https://clickhouse.com/docs/en/interfaces/formats#tabseparated-data-formatting + defp encode_param(b) when is_binary(b) do + escape_param([{"\\", "\\\\"}, {"\t", "\\\t"}, {"\n", "\\\n"}], b) + end + + defp encode_param(b) when is_boolean(b), do: Atom.to_string(b) + defp encode_param(nil), do: "\\N" + defp encode_param(%Decimal{} = d), do: Decimal.to_string(d, :normal) + defp encode_param(%Date{} = date), do: Date.to_iso8601(date) + defp encode_param(%NaiveDateTime{} = naive), do: NaiveDateTime.to_iso8601(naive) + defp encode_param(%Time{} = time), do: Time.to_iso8601(time) + + defp encode_param(%DateTime{microsecond: microsecond} = dt) do + dt = DateTime.shift_zone!(dt, "Etc/UTC") + + case microsecond do + {val, precision} when val > 0 and precision > 0 -> + size = round(:math.pow(10, precision)) + unix = DateTime.to_unix(dt, size) + seconds = div(unix, size) + fractional = rem(unix, size) + + IO.iodata_to_binary([ + Integer.to_string(seconds), + ?., + String.pad_leading(Integer.to_string(fractional), precision, "0") + ]) + + _ -> + dt |> DateTime.to_unix(:second) |> Integer.to_string() + end + end + + defp encode_param(tuple) when is_tuple(tuple) do + IO.iodata_to_binary([?(, encode_array_params(Tuple.to_list(tuple)), ?)]) + end + + defp encode_param(a) when is_list(a) do + IO.iodata_to_binary([?[, encode_array_params(a), ?]]) + end + + defp encode_param(m) when is_map(m) do + IO.iodata_to_binary([?{, encode_map_params(Map.to_list(m)), ?}]) + end + + defp encode_array_params([last]), do: encode_array_param(last) + + defp encode_array_params([s | rest]) do + [encode_array_param(s), ?, | encode_array_params(rest)] + end + + defp encode_array_params([] = empty), do: empty + + defp encode_map_params([last]), do: encode_map_param(last) + + defp encode_map_params([kv | rest]) do + [encode_map_param(kv), ?, | encode_map_params(rest)] + end + + defp encode_map_params([] = empty), do: empty + + defp encode_array_param(s) when is_binary(s) do + [?', escape_param([{"'", "''"}, {"\\", "\\\\"}], s), ?'] + end + + defp encode_array_param(nil), do: "null" + + defp encode_array_param(%s{} = param) when s in [Date, NaiveDateTime] do + [?', encode_param(param), ?'] + end + + defp encode_array_param(v), do: encode_param(v) + + defp encode_map_param({k, v}) do + [encode_array_param(k), ?:, encode_array_param(v)] + end + + defp escape_param([{pattern, replacement} | escapes], param) do + param = String.replace(param, pattern, replacement) + escape_param(escapes, param) + end + + defp escape_param([], param), do: param +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 97caedd8..a9e54b02 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,10 +1,7 @@ clickhouse_available? = - case :httpc.request(:get, {~c"http://localhost:8123/ping", []}, [], []) do - {:ok, {{_version, _status = 200, _reason}, _headers, ~c"Ok.\n"}} -> - true - - {:error, {:failed_connect, [{:to_address, _to_address}, {:inet, [:inet], :econnrefused}]}} -> - false + case Help.http("http://localhost:8123/ping") do + {200, _headers, "Ok.\n"} -> true + {:error, :econnrefused} -> false end unless clickhouse_available? do @@ -18,29 +15,4 @@ unless clickhouse_available? do end Calendar.put_time_zone_database(Tz.TimeZoneDatabase) -default_test_db = System.get_env("CH_DATABASE", "ch_elixir_test") -Application.put_env(:ch, :database, default_test_db) - -Ch.Test.query( - "DROP DATABASE IF EXISTS {db:Identifier}", - %{"db" => default_test_db}, - database: "default" -) - -Ch.Test.query( - "CREATE DATABASE {db:Identifier}", - %{"db" => default_test_db}, - database: "default" -) - -%{rows: [[ch_version]]} = Ch.Test.query("SELECT version()") - -extra_exclude = - if ch_version >= "25" do - [] - else - # Time, Variant, JSON, and Dynamic types are not supported in older ClickHouse versions we have in the CI - [:time, :variant, :json, :dynamic] - end - -ExUnit.start(exclude: [:slow | extra_exclude]) +ExUnit.start() From cb4b48aecb0fbaffc6d1c8183576446ff52dc7b1 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 8 Apr 2026 18:34:17 +0300 Subject: [PATCH 2/3] maybe compression can be done outside of the buffer --- bench/compress.exs | 16 +++++++++++++++- bench/support/compress.ex | 16 ++++++++++++++++ mix.exs | 4 +++- mix.lock | 9 +++++++++ 4 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 bench/support/compress.ex diff --git a/bench/compress.exs b/bench/compress.exs index 1115d332..649a668a 100644 --- a/bench/compress.exs +++ b/bench/compress.exs @@ -1 +1,15 @@ -Benchee.run(%{"zstd" => fn input -> :zstd.compress(input) end}, inputs: %{"a" => "a"}) +Benchee.run( + %{ + "zstd once" => fn input -> :zstd.compress(input) end, + "zstd stream" => fn input -> Compress.zstd_stream(input) end, + "nimble_lz4 once" => fn input -> NimbleLZ4.compress(input) end + }, + inputs: %{ + "1" => List.duplicate("a", 1), + "10" => List.duplicate("a", 10), + "100" => List.duplicate("a", 100), + "1000" => List.duplicate("a", 1000), + "10000" => List.duplicate("a", 10000), + "100000" => List.duplicate("a", 100_000) + } +) diff --git a/bench/support/compress.ex b/bench/support/compress.ex new file mode 100644 index 00000000..4de5be75 --- /dev/null +++ b/bench/support/compress.ex @@ -0,0 +1,16 @@ +defmodule Compress do + def zstd_stream(input) when is_list(input) do + {:ok, ctx} = :zstd.context(:compress) + zstd_stream_continue(input, ctx) + end + + defp zstd_stream_continue([value | rest], ctx) do + {:continue, c} = :zstd.stream(ctx, value) + [c | zstd_stream_continue(rest, ctx)] + end + + defp zstd_stream_continue([], ctx) do + {:done, c} = :zstd.finish(ctx, []) + c + end +end diff --git a/mix.exs b/mix.exs index 379dddb9..1cc50ca0 100644 --- a/mix.exs +++ b/mix.exs @@ -29,6 +29,7 @@ defmodule Ch.MixProject do # Specifies which paths to compile per environment. defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(:bench), do: ["lib", "bench/support"] defp elixirc_paths(_env), do: ["lib"] defp extra_applications(:dev), do: [:tools] @@ -47,7 +48,8 @@ defmodule Ch.MixProject do {:benchee, "~> 1.0", only: [:bench]}, {:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :docs}, - {:tz, "~> 0.28.1", only: [:test]} + {:tz, "~> 0.28.1", only: [:test]}, + {:nimble_lz4, "~> 1.1", only: [:dev, :test, :bench]} ] end diff --git a/mix.lock b/mix.lock index a3009f33..240edfa7 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,6 @@ %{ "benchee": {:hex, :benchee, "1.5.0", "4d812c31d54b0ec0167e91278e7de3f596324a78a096fd3d0bea68bb0c513b10", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.1", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "5b075393aea81b8ae74eadd1c28b1d87e8a63696c649d8293db7c4df3eb67535"}, + "castore": {:hex, :castore, "1.0.18", "5e43ef0ec7d31195dfa5a65a86e6131db999d074179d2ba5a8de11fe14570f55", [:mix], [], "hexpm", "f393e4fe6317829b158fb74d86eb681f737d2fe326aa61ccf6293c4104957e34"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, @@ -7,16 +8,24 @@ "ecto": {:hex, :ecto, "3.13.5", "9d4a69700183f33bf97208294768e561f5c7f1ecf417e0fa1006e4a91713a834", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df9efebf70cf94142739ba357499661ef5dbb559ef902b68ea1f3c1fabce36de"}, "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, "ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"}, + "finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"}, + "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "nimble_lz4": {:hex, :nimble_lz4, "1.1.0", "53b87e37f1efc79fda6433ab35563788a628c7d33aef45d16f31a86a399a3cc5", [:mix], [{:rustler, "~> 0.34.0", [hex: :rustler, repo: "hexpm", optional: false]}, {:rustler_precompiled, "~> 0.7.2", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "2c1d46eee76c5bbba8d6d3d23c75210dcb509f6698f0a01fb95015bf95f1b6d3"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "req": {:hex, :req, "0.5.17", "0096ddd5b0ed6f576a03dde4b158a0c727215b15d2795e59e0916c6971066ede", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0b8bc6ffdfebbc07968e59d3ff96d52f2202d0536f10fef4dc11dc02a2a43e39"}, + "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, + "rustler_precompiled": {:hex, :rustler_precompiled, "0.7.3", "42cb9449785cd86c87453e39afdd27a0bdfa5c77a4ec5dc5ce45112e06b9f89b", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "cbc4b3777682e5f6f43ed39b0e0b4a42dccde8053aba91b4514e8f5ff9a5ac6d"}, "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, "telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"}, "telemetry_docs": {:hex, :telemetry_docs, "0.1.0", "9c95cdfcf34960b6533ff19d56a2ba2f1980aad7f2c3191d3e7ec91a85e6f74c", [:mix], [{:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}], "hexpm", "e59ac534d29437d75680a1af9a0d7c67569ffe146c35e47809b9fdabd5e04dc1"}, + "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"}, } From 0957ea6c3e0cd1a791ca92cdbaf9eb90c75375f6 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 8 Apr 2026 18:40:46 +0300 Subject: [PATCH 3/3] more entrypy --- bench/compress.exs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/bench/compress.exs b/bench/compress.exs index 649a668a..78f93eae 100644 --- a/bench/compress.exs +++ b/bench/compress.exs @@ -1,3 +1,10 @@ +rowbinary = fn count -> + Enum.map(1..count, fn i -> + row = [i, "Golang SQL database driver", [1, 2, 3, 4, 5, 6, 7, 8, 9], DateTime.utc_now()] + Ch.RowBinary.encode_row(row, ["UInt64", "String", "Array(UInt8)", "DateTime"]) + end) +end + Benchee.run( %{ "zstd once" => fn input -> :zstd.compress(input) end, @@ -5,11 +12,8 @@ Benchee.run( "nimble_lz4 once" => fn input -> NimbleLZ4.compress(input) end }, inputs: %{ - "1" => List.duplicate("a", 1), - "10" => List.duplicate("a", 10), - "100" => List.duplicate("a", 100), - "1000" => List.duplicate("a", 1000), - "10000" => List.duplicate("a", 10000), - "100000" => List.duplicate("a", 100_000) + "1 rows" => rowbinary.(1), + "1000 rows" => rowbinary.(1000), + "100,000 rows" => rowbinary.(100_000) } )