Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions lib/ch/row_binary.ex
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,9 @@ defmodule Ch.RowBinary do
_ when is_integer(value) -> [0x0A | encode(:i64, value)]
_ when is_float(value) -> [0x0E | encode(:f64, value)]
%Date{} -> [0x0F | encode(:date, value)]
%DateTime{} -> [0x11 | encode(:datetime, value)]
%NaiveDateTime{} -> [0x11 | encode(:datetime, value)]
%{} -> [0x30, 0x00, 0x80, 0x08, 0x20, 0x00, 0x00, 0x00 | encode(:json, value)]
[] -> [0x1E, 0x00]
end
end
Expand Down Expand Up @@ -1136,6 +1138,74 @@ defmodule Ch.RowBinary do
decode_dynamic_continue(rest, [:low_cardinality | dynamic], types_rest, row, rows, types)
end

# JSON(max_dynamic_paths=N, max_dynamic_types=M, path Type, SKIP skip_path, SKIP REGEXP skip_path_regexp) 0x30<uint8_serialization_version><var_int_max_dynamic_paths><uint8_max_dynamic_types><var_uint_number_of_typed_paths><var_uint_path_name_size_1><path_name_data_1><encoded_type_1>...<var_uint_number_of_skip_paths><var_uint_skip_path_size_1><skip_path_data_1>...<var_uint_number_of_skip_path_regexps><var_uint_skip_path_regexp_size_1><skip_path_data_regexp_1>...
defp decode_dynamic(<<0x30, rest::bytes>>, dynamic, types_rest, row, rows, types) do
# Assert uint8_serialization_version to be 0
<<0x00, rest::bytes>> = rest

# Skip var_int_max_dynamic_paths
{_paths, rest} = read_varint(rest)

# Skip uint8_max_dynamic_types
<<_val, rest::bytes>> = rest

# Read var_uint_number_of_typed_paths
{typed_paths, rest} = read_varint(rest)

# Skip `typed_paths` typed paths
rest =
Enum.reduce(1..typed_paths//1, rest, fn _, rest ->
{count, rest} = read_varint(rest)
<<_discard::size(count)-bytes, rest::bytes>> = rest
skip_type(rest)
end)

# Read var_uint_number_of_skip_paths
{skip_paths, rest} = read_varint(rest)

# Skip `skip_paths` skipped paths
rest =
Enum.reduce(1..skip_paths//1, rest, fn _, rest ->
{count, rest} = read_varint(rest)
<<_discard::size(count)-bytes, rest::bytes>> = rest
rest
end)

# Read var_uint_number_of_skip_path_regexps
{skip_path_regexes, rest} = read_varint(rest)

# Skip `skip_path_regexes` skipped paths regex
rest =
Enum.reduce(1..skip_path_regexes//1, rest, fn _, rest ->
{count, rest} = read_varint(rest)
<<_discard::size(count)-bytes, rest::bytes>> = rest
rest
end)

decode_dynamic_continue(rest, [:json | dynamic], types_rest, row, rows, types)
end

for {pattern, value} <- varints do
defp read_varint(<<unquote(pattern), rest::bytes>>), do: {unquote(value), rest}
end

other_dynamic_types = [
datetime: 0x11,
set: 0x21,
bfloat16: 0x31,
time: 0x32
]

# Consume a type header from binary input, returning the rest.
# TODO: Only supports single-byte type headers for now.
def skip_type(<<type, rest::bytes>>)
when type in unquote(Keyword.values(dynamic_types ++ other_dynamic_types)), do: rest

def skip_type(<<type, _::bytes>>) do
raise ArgumentError,
"Unsupported type definition (starting with 0x#{Base.encode16(<<type>>)}) while decoding dynamic JSON. Only single-byte type identifiers are currently supported."
end

# TODO
# Enum8 0x17 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int8_value_1>...<var_uint_name_size_N><name_data_N><int8_value_N>
# Enum16 0x18 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int16_little_endian_value_1>...><var_uint_name_size_N><name_data_N><int16_little_endian_value_N>
Expand All @@ -1151,7 +1221,6 @@ defmodule Ch.RowBinary do
# Custom type (Ring, Polygon, etc) 0x2C<var_uint_type_name_size><type_name_data>
# SimpleAggregateFunction(function_name(param_1, ..., param_N), arg_T1, ..., arg_TN) 0x2E<var_uint_function_name_size><function_name_data><var_uint_number_of_parameters><param_1>...<param_N><var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N> (see aggregate function parameter binary encoding)
# Nested(name1 T1, ..., nameN TN) 0x2F<var_uint_number_of_elements><var_uint_name_size_1><name_data_1><nested_type_encoding_1>...<var_uint_name_size_N><name_data_N><nested_type_encoding_N>
# JSON(max_dynamic_paths=N, max_dynamic_types=M, path Type, SKIP skip_path, SKIP REGEXP skip_path_regexp) 0x30<uint8_serialization_version><var_int_max_dynamic_paths><uint8_max_dynamic_types><var_uint_number_of_typed_paths><var_uint_path_name_size_1><path_name_data_1><encoded_type_1>...<var_uint_number_of_skip_paths><var_uint_skip_path_size_1><skip_path_data_1>...<var_uint_number_of_skip_path_regexps><var_uint_skip_path_regexp_size_1><skip_path_data_regexp_1>...

unsupported_dynamic_types = %{
"Enum8" => 0x17,
Expand All @@ -1167,8 +1236,7 @@ defmodule Ch.RowBinary do
"Dynamic" => 0x2B,
"CustomType" => 0x2C,
"SimpleAggregateFunction" => 0x2E,
"Nested" => 0x2F,
"JSON" => 0x30
"Nested" => 0x2F
}

for {type, code} <- unsupported_dynamic_types do
Expand Down
82 changes: 70 additions & 12 deletions test/ch/json_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -345,18 +345,76 @@ defmodule Ch.JSONTest do
]
]

# TODO
assert_raise ArgumentError, "unsupported dynamic type JSON", fn ->
Ch.query!(conn, "SELECT json.a.b, dynamicType(json.a.b) FROM json_test;", [], query_options)
end
assert Ch.query!(conn, "SELECT json.a.b FROM json_test;", [], query_options).rows == [
[
[
%{"c" => 42, "d" => "Hello", "f" => [[%{"g" => 42.42}]], "k" => %{"j" => 1000}},
%{"c" => 43},
%{
"d" => "My",
"e" => [1, 2, 3],
"f" => [[%{"g" => 43.43, "h" => "2020-01-01"}]],
"k" => %{"j" => 2000}
}
]
],
[[1, 2, 3]],
[
[
%{"c" => 44, "f" => [[%{"h" => "2020-01-02"}]]},
%{
"d" => "World",
"e" => [4, 5, 6],
"f" => [[%{"g" => 44.44}]],
"k" => %{"j" => 3000}
}
]
]
]

assert_raise ArgumentError, "unsupported dynamic type JSON", fn ->
Ch.query!(
conn,
"SELECT json.a.b.:`Array(JSON)`.c, json.a.b.:`Array(JSON)`.f, json.a.b.:`Array(JSON)`.d FROM json_test;",
[],
query_options
)
end
assert Ch.query!(
conn,
"SELECT json.a.b[].c, json.a.b[].f, json.a.b[].d FROM json_test;",
[],
query_options
).rows == [
[
[42, 43, nil],
[[[%{"g" => 42.42}]], nil, [[%{"g" => 43.43, "h" => "2020-01-01"}]]],
["Hello", nil, "My"]
],
[[], [], []],
[[44, nil], [[[%{"h" => "2020-01-02"}]], [[%{"g" => 44.44}]]], [nil, "World"]]
]

query_options = Keyword.put(query_options, :enable_time_time64_type, 1)

assert_raise ArgumentError,
"Unsupported type definition (starting with 0x34) while decoding dynamic JSON. Only single-byte type identifiers are currently supported.",
fn ->
Ch.query!(
conn,
~s|SELECT '{"a": "10:00:00.050"}'::JSON(a Time64)::Dynamic;|,
[],
query_options
)
end
end

test "encode JSON in dynamic column", %{conn: conn, query_options: query_options} do
Ch.query!(conn, "CREATE TABLE json_test (value Dynamic) ENGINE = Memory;", [], query_options)

query_options = Keyword.put(query_options, :types, [:dynamic])

Ch.query!(
conn,
"INSERT INTO json_test (value) FORMAT RowBinary",
[[%{"json_obj" => 42}]],
query_options
)

assert Ch.query!(conn, "SELECT value FROM json_test").rows == [
[%{"json_obj" => 42}]
]
end
end
Loading