From abc70f673dba0692921345dcf7d1a3e2a0c8710e Mon Sep 17 00:00:00 2001 From: kyle-hawk Date: Sat, 14 Jan 2023 21:18:08 +0800 Subject: [PATCH 1/2] close #109:first edition --- .../bitsail/common/model/ColumnInfo.java | 7 + .../constant/ClickhouseConstants.java | 2 + .../option/ClickhouseWriterOptions.java | 57 ++++++ .../clickhouse/sink/ClickhouseSink.java | 38 ++++ .../clickhouse/sink/ClickhouseWriter.java | 180 ++++++++++++++++++ .../util/ClickhouseConnectionHolder.java | 17 ++ .../clickhouse/ClickhouseContainerHolder.java | 53 ++++-- .../sink/Clickhouse2ClickhouseITCase.java | 67 +++++++ .../source/ClickhouseReaderITCase.java | 2 +- .../resources/clickhouse_to_clickhouse.json | 105 ++++++++++ .../test/resources/clickhouse_to_print.json | 4 + .../src/test/resources/example_data.csv | 100 +++++----- 12 files changed, 565 insertions(+), 67 deletions(-) create mode 100644 bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java create mode 100644 bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java create mode 100644 bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java create mode 100644 bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/sink/Clickhouse2ClickhouseITCase.java create mode 100644 bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_clickhouse.json diff --git a/bitsail-common/src/main/java/com/bytedance/bitsail/common/model/ColumnInfo.java b/bitsail-common/src/main/java/com/bytedance/bitsail/common/model/ColumnInfo.java index 7bed7c54d..68fcd1d1d 100644 --- a/bitsail-common/src/main/java/com/bytedance/bitsail/common/model/ColumnInfo.java +++ b/bitsail-common/src/main/java/com/bytedance/bitsail/common/model/ColumnInfo.java @@ -36,6 +36,7 @@ public class ColumnInfo implements Serializable { private String comment; private Object defaultValue; private String properties; + private Boolean nullable; @Builder public ColumnInfo(String name, String type) { @@ -58,6 +59,12 @@ public ColumnInfo(String name, this.defaultValue = defaultValue; } + public ColumnInfo(String name, String type, Boolean nullable) { + this.name = name; + this.type = type; + this.nullable = nullable; + } + public String getComment() { return comment; } diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/constant/ClickhouseConstants.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/constant/ClickhouseConstants.java index a88947691..ffffc5543 100644 --- a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/constant/ClickhouseConstants.java +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/constant/ClickhouseConstants.java @@ -18,4 +18,6 @@ public class ClickhouseConstants { public static String CLICKHOUSE_CONNECTOR_NAME = "clickhouse"; + + public static String CLICKHOUSE_DECIMAL_INPUT_TYPE = "Decimal(76, 76)"; } diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java new file mode 100644 index 000000000..2fbb1852b --- /dev/null +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.clickhouse.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.WriterOptions; + +import com.alibaba.fastjson.TypeReference; + +import java.util.Map; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX; + +public interface ClickhouseWriterOptions extends WriterOptions.BaseWriterOptions { + /** + * Standard format: + * jdbc:(ch|clickhouse)[:protocol]://endpoint[,endpoint][/database][?parameters][#tags]
+ * - endpoint: [protocol://]host[:port][/database][?parameters][#tags]
+ * - protocol: (grpc|grpcs|http|https|tcp|tcps) + */ + @Essential + ConfigOption JDBC_URL = + key(WRITER_PREFIX + "jdbc_url") + .noDefaultValue(String.class); + +/* ConfigOption WRITE_MODE = + key(WRITER_PREFIX + "write_mode") + .noDefaultValue(String.class); + + ConfigOption WRITER_PARALLELISM_NUM = + key(WRITER_PREFIX + "writer_parallelism_num") + .noDefaultValue(String.class);*/ + + // Connection properties. + ConfigOption> CUSTOMIZED_CONNECTION_PROPERTIES = + key(WRITER_PREFIX + "customized_connection_properties") + .onlyReference(new TypeReference>() {}); + + ConfigOption BATCH_SIZE = + key(WRITER_PREFIX + "batch_size") + .defaultValue(10); +} diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java new file mode 100644 index 000000000..82b97e68e --- /dev/null +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java @@ -0,0 +1,38 @@ +package com.bytedance.bitsail.connector.clickhouse.sink; + +import com.bytedance.bitsail.base.connector.writer.v1.Sink; +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; +import com.bytedance.bitsail.connector.clickhouse.constant.ClickhouseConstants; + +import java.io.IOException; +import java.io.Serializable; + +public class ClickhouseSink implements Sink { + private BitSailConfiguration jobConf; + + @Override + public String getWriterName() { + return ClickhouseConstants.CLICKHOUSE_CONNECTOR_NAME; + } + + @Override + public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception { + this.jobConf = writerConfiguration; + } + + @Override + public Writer createWriter(Writer.Context context) throws IOException { + return new ClickhouseWriter<>(this.jobConf, context); + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new FileMappingTypeInfoConverter(getWriterName()); + } +} + diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java new file mode 100644 index 000000000..9c72affbf --- /dev/null +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java @@ -0,0 +1,180 @@ +package com.bytedance.bitsail.connector.clickhouse.sink; + +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.clickhouse.error.ClickhouseErrorCode; +import com.bytedance.bitsail.connector.clickhouse.option.ClickhouseWriterOptions; +import com.bytedance.bitsail.connector.clickhouse.source.reader.ClickhouseSourceReader; +import com.bytedance.bitsail.connector.clickhouse.util.ClickhouseConnectionHolder; + +import com.clickhouse.jdbc.ClickHouseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.domain.ClickHouseDataType; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.bytedance.bitsail.connector.clickhouse.constant.ClickhouseConstants.CLICKHOUSE_DECIMAL_INPUT_TYPE; + +public class ClickhouseWriter implements Writer { + private static final Logger LOG = LoggerFactory.getLogger(ClickhouseSourceReader.class); + + private final int subTaskId; + + private final String jdbcUrl; + private final List columnInfos; + private final String dbName; + private final String tableName; + + private final String userName; + private final String password; + /*private final String writeMode;*/ + /*private final String writerParallelismNum;*/ + private final Integer batchSize; + + private final String insertSql; + + private final transient ClickhouseConnectionHolder connectionHolder; + + private final List writeBuffer; + /*private final List commitBuffer;*/ + private final AtomicInteger printCount; + + /** + * Ensure there is only one connection activated. + */ + private transient ClickHouseConnection connection; + /** + * Ensure there is only one statement activated. + */ + private transient PreparedStatement statement; + + public ClickhouseWriter(BitSailConfiguration jobConf, Writer.Context context) { + this.subTaskId = context.getIndexOfSubTaskId(); + + this.jdbcUrl = jobConf.getNecessaryOption(ClickhouseWriterOptions.JDBC_URL, + ClickhouseErrorCode.REQUIRED_VALUE); + this.userName = jobConf.getNecessaryOption(ClickhouseWriterOptions.USER_NAME, + ClickhouseErrorCode.REQUIRED_VALUE); + this.password = jobConf.getNecessaryOption(ClickhouseWriterOptions.PASSWORD, + ClickhouseErrorCode.REQUIRED_VALUE); + + this.dbName = jobConf.getNecessaryOption(ClickhouseWriterOptions.DB_NAME, + ClickhouseErrorCode.REQUIRED_VALUE); + this.tableName = jobConf.getNecessaryOption(ClickhouseWriterOptions.TABLE_NAME, + ClickhouseErrorCode.REQUIRED_VALUE); + + /*this.writeMode = jobConf.get(ClickhouseWriterOptions.TABLE_NAME);*/ + /*this.writerParallelismNum = jobConf.get(ClickhouseWriterOptions.WRITER_PARALLELISM_NUM);*/ + this.batchSize = jobConf.get(ClickhouseWriterOptions.BATCH_SIZE); + + this.columnInfos = jobConf.getNecessaryOption(ClickhouseWriterOptions.COLUMNS, + ClickhouseErrorCode.REQUIRED_VALUE); + + connectionHolder = new ClickhouseConnectionHolder(jobConf, true); + this.connection = connectionHolder.connect(); + + insertSql = this.insertSql(); + try { + this.statement = connection.prepareStatement(insertSql); + } catch (SQLException e) { + throw new RuntimeException("Failed to prepare statement.", e); + } + + this.writeBuffer = new ArrayList<>(batchSize); + /*this.commitBuffer = new ArrayList<>(batchSize);*/ + this.printCount = new AtomicInteger(0); + + LOG.info("Clickhouse sink writer {} is initialized.", subTaskId); + } + + @Override + public void write(Row element) throws IOException { + this.writeBuffer.add(element); + + if (writeBuffer.size() == batchSize) { + this.flush(false); + } + printCount.incrementAndGet(); + } + + @Override + public void flush(boolean endOfInput) throws IOException { + /*this.commitBuffer.addAll(this.writeBuffer);*/ + + try { + for (Row ele : writeBuffer) { + for (int i = 0; i < this.columnInfos.size(); i++) { + statement.setObject(i + 1, ele.getField(i)); + } + statement.addBatch(); + } + statement.executeBatch(); + } catch (SQLException e) { + throw new RuntimeException("Failed to write batch.", e); + } + + writeBuffer.clear(); + if (endOfInput) { + LOG.info("all records are sent to commit buffer."); + } + } + + @Override + public List prepareCommit() throws IOException { + return Collections.emptyList(); + } + + @Override + public void close() throws IOException { + Writer.super.close(); + + try { + this.statement.close(); + this.connection.close(); + } catch (SQLException e) { + throw new RuntimeException("Failed to close clickhouse writer.", e); + } + } + + private String insertSql() { + String inputFields = this.columnInfos.stream().map(col -> { + if (Objects.isNull(col.getNullable()) || !col.getNullable()) { + return String.format("%s %s", col.getName(), this.getClickhouseType(col.getType())); + } else { + return String.format("%s Nullable(%s)", col.getName(), this.getClickhouseType(col.getType())); + } + }).reduce((n1, n2) -> String.format("%s, %s", n1, n2)).get(); + + StringBuffer sql = new StringBuffer("INSERT INTO "); + sql.append(String.format("%s.%s", this.dbName, this.tableName)) + .append(" SELECT ") + .append(this.columnInfos.stream().map(col -> col.getName()).reduce((n1, n2) -> String.format("%s, %s", n1, n2)).get()) + .append(" FROM input('") + .append(inputFields) + .append("')"); + + return sql.toString(); + } + + private String getClickhouseType(String inputType) { + ClickHouseDataType ckType = ClickHouseDataType.fromTypeString(inputType); + String strType = ckType.toString(); + + if (strType.toLowerCase().contains("decimal")) { + return CLICKHOUSE_DECIMAL_INPUT_TYPE; + } + return strType; + } + +} \ No newline at end of file diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/util/ClickhouseConnectionHolder.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/util/ClickhouseConnectionHolder.java index 91442939d..99eafce9e 100644 --- a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/util/ClickhouseConnectionHolder.java +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/util/ClickhouseConnectionHolder.java @@ -19,6 +19,7 @@ import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.connector.clickhouse.error.ClickhouseErrorCode; import com.bytedance.bitsail.connector.clickhouse.option.ClickhouseReaderOptions; +import com.bytedance.bitsail.connector.clickhouse.option.ClickhouseWriterOptions; import com.clickhouse.jdbc.ClickHouseConnection; import com.clickhouse.jdbc.ClickHouseDataSource; @@ -57,6 +58,22 @@ public ClickhouseConnectionHolder(BitSailConfiguration jobConf) { } } + public ClickhouseConnectionHolder(BitSailConfiguration jobConf, Boolean isWriter) { + this.jdbcUrl = jobConf.getNecessaryOption(ClickhouseWriterOptions.JDBC_URL, + ClickhouseErrorCode.REQUIRED_VALUE); + this.dbName = jobConf.getNecessaryOption(ClickhouseWriterOptions.DB_NAME, + ClickhouseErrorCode.REQUIRED_VALUE); + + this.userName = jobConf.get(ClickhouseWriterOptions.USER_NAME); + this.password = jobConf.get(ClickhouseWriterOptions.PASSWORD); + + this.connectionProperties = new Properties(); + if (jobConf.fieldExists(ClickhouseWriterOptions.CUSTOMIZED_CONNECTION_PROPERTIES)) { + jobConf.get(ClickhouseWriterOptions.CUSTOMIZED_CONNECTION_PROPERTIES) + .forEach(connectionProperties::setProperty); + } + } + /** * Refrence: Clickhouse-jdbc */ diff --git a/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/ClickhouseContainerHolder.java b/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/ClickhouseContainerHolder.java index eb5c268d5..5a6d05b5f 100644 --- a/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/ClickhouseContainerHolder.java +++ b/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/ClickhouseContainerHolder.java @@ -49,12 +49,14 @@ public class ClickhouseContainerHolder { private static final String CLICKHOUSE_IMAGE_NAME = "clickhouse/clickhouse-server:22-alpine"; private static final String DATABASE = "default"; - private static final String TABLE = "test_ch_table"; + private static final String TABLE_SOURCE = "test_ch_table"; + private static final String TABLE_SINK = "test_ch_table_sink"; private static final int HTTP_PORT = 8123; - private static final String DROP_TABLE_SQL = "DROP TABLE IF EXISTS " + TABLE; - private static final String CREATE_TABLE_SQL = - "CREATE TABLE " + TABLE + "\n" + + private static final String DROP_SOURCE_TABLE_SQL = "DROP TABLE IF EXISTS " + TABLE_SOURCE; + private static final String DROP_SINK_TABLE_SQL = "DROP TABLE IF EXISTS " + TABLE_SINK; + private static final String CREATE_SOURCE_TABLE_SQL = + "CREATE TABLE " + TABLE_SOURCE + "\n" + "(\n" + " `id` `Int64`,\n" + " `int_type` `Int32`,\n" + @@ -62,14 +64,27 @@ public class ClickhouseContainerHolder { " `string_type` `String`,\n" + " `p_date` `Date`,\n" + " `int_128` `Int128`,\n" + - " `int_256` `Int256`\n" + + " `int_256` `Int256`,\n" + + " `decimal_type` Decimal256(76)\n" + ") ENGINE = MergeTree()\n" + "PARTITION BY toYYYYMM(p_date)\n" + "PRIMARY KEY (id);"; - private static final String INSERT_SQL_HEADER = "INSERT INTO " + TABLE - + " (id, int_type, double_type, string_type, p_date, int_128, int_256) VALUES "; - private static final String COUNT_SQL = "SELECT count(id) FROM " + TABLE; - private static final String SCAN_SQL = "SELECT id, int_type, double_type, string_type, p_date, int_128, int_256 from " + TABLE + " ORDER BY id"; + private static final String CREATE_SINK_TABLE_SQL = + "CREATE TABLE " + TABLE_SINK + "\n" + + "(\n" + + " `id` Nullable(Int64),\n" + + " `int_type` Nullable(Int32),\n" + + " `double_type` Nullable(Float64),\n" + + " `string_type` Nullable(String),\n" + + " `p_date` Nullable(Date),\n" + + " `int_128` Nullable(Int128),\n" + + " `int_256` Nullable(Int256),\n" + + " `decimal_type` Nullable(Decimal256(76))\n" + + ") ENGINE = TinyLog\n"; + private static final String INSERT_SQL_HEADER = "INSERT INTO " + TABLE_SOURCE + + " (id, int_type, double_type, string_type, p_date, int_128, int_256,decimal_type) VALUES "; + private static final String COUNT_SQL = "SELECT count(id) FROM " + TABLE_SOURCE; + private static final String SCAN_SQL = "SELECT id, int_type, double_type, string_type, p_date, int_128, int_256,decimal_type from " + TABLE_SOURCE + " ORDER BY id"; private static final int INSERT_BATCH_SIZE = 10; private ClickHouseContainer container; @@ -99,7 +114,7 @@ public String getDatabase() { } public String getTable() { - return TABLE; + return TABLE_SOURCE; } public String getUsername() { @@ -110,9 +125,14 @@ public String getPassword() { return container.getPassword(); } - public void createExampleTable() throws SQLException { - performQuery(DROP_TABLE_SQL); - performQuery(CREATE_TABLE_SQL); + public void createExampleSourceTable() throws SQLException { + performQuery(DROP_SOURCE_TABLE_SQL); + performQuery(CREATE_SOURCE_TABLE_SQL); + } + + public void createExampleSinkTable() throws SQLException { + performQuery(DROP_SINK_TABLE_SQL); + performQuery(CREATE_SINK_TABLE_SQL); } public long countTable() throws SQLException { @@ -138,7 +158,7 @@ public void insertData(int totalCount) throws Exception { String insertSql = values.stream().collect(Collectors.joining(", ", INSERT_SQL_HEADER, ";")); performQuery(insertSql); } - LOG.info("Successfully insert {} rows into table [{}]", totalCount, TABLE); + LOG.info("Successfully insert {} rows into table [{}]", totalCount, TABLE_SOURCE); } public void importData() throws Exception { @@ -148,7 +168,7 @@ public void importData() throws Exception { try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { client.connect(server).write() - .table(TABLE) + .table(TABLE_SOURCE) .data(file) .executeAndWait(); } @@ -171,7 +191,8 @@ private String generateRow(int index) throws Exception { "'text_" + index + "'", // string_type "'" + sdf.format(calendar.getTime()) + "'", // date_type new BigInteger("17014118346046923173168730371588").toString(), // int128 - new BigInteger("578960446186580977117854925043439539266349923328202820").toString() // int256 + new BigInteger("578960446186580977117854925043439539266349923328202820").toString(), // int256 + Double.valueOf((100000.0 + index) / 1000000.0).toString() //decimal_type ); return data.stream().collect(Collectors.joining(", ", "(", ")")); } diff --git a/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/sink/Clickhouse2ClickhouseITCase.java b/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/sink/Clickhouse2ClickhouseITCase.java new file mode 100644 index 000000000..809c5c530 --- /dev/null +++ b/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/sink/Clickhouse2ClickhouseITCase.java @@ -0,0 +1,67 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.clickhouse.sink; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.clickhouse.ClickhouseContainerHolder; +import com.bytedance.bitsail.connector.clickhouse.option.ClickhouseReaderOptions; +import com.bytedance.bitsail.connector.clickhouse.option.ClickhouseWriterOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class Clickhouse2ClickhouseITCase { + private static final int TOTAL_COUNT = 100; + + private ClickhouseContainerHolder containerHolder; + + @Before + public void initClickhouse() throws Exception { + containerHolder = new ClickhouseContainerHolder(); + containerHolder.start(); + containerHolder.createExampleSourceTable(); + containerHolder.insertData(TOTAL_COUNT); + + containerHolder.createExampleSinkTable(); + + } + + @Test + public void testClickhouseToPrint() throws Exception { + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("clickhouse_to_clickhouse.json"); + + jobConf.set(ClickhouseReaderOptions.JDBC_URL, containerHolder.getJdbcHostUrl()); + jobConf.set(ClickhouseReaderOptions.USER_NAME, containerHolder.getUsername()); + jobConf.set(ClickhouseReaderOptions.PASSWORD, containerHolder.getPassword()); + + jobConf.set(ClickhouseWriterOptions.JDBC_URL, containerHolder.getJdbcHostUrl()); + jobConf.set(ClickhouseWriterOptions.USER_NAME, containerHolder.getUsername()); + jobConf.set(ClickhouseWriterOptions.PASSWORD, containerHolder.getPassword()); + + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @After + public void close() { + if (containerHolder != null) { + containerHolder.close(); + } + } +} diff --git a/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/source/ClickhouseReaderITCase.java b/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/source/ClickhouseReaderITCase.java index bd019e70e..89fd1554d 100644 --- a/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/source/ClickhouseReaderITCase.java +++ b/bitsail-connectors/connector-clickhouse/src/test/java/com/bytedance/bitsail/connector/clickhouse/source/ClickhouseReaderITCase.java @@ -37,7 +37,7 @@ public class ClickhouseReaderITCase { public void initClickhouse() throws Exception { containerHolder = new ClickhouseContainerHolder(); containerHolder.start(); - containerHolder.createExampleTable(); + containerHolder.createExampleSourceTable(); containerHolder.insertData(TOTAL_COUNT); } diff --git a/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_clickhouse.json b/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_clickhouse.json new file mode 100644 index 000000000..adc674d59 --- /dev/null +++ b/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_clickhouse.json @@ -0,0 +1,105 @@ +{ + "job": { + "common": { + "job_id": 312, + "instance_id": 3124, + "job_name": "BitSail_fake_to_print_test", + "user_name": "test" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource", + "jdbc_url": "jdbc:clickhouse://localhost:8123", + "db_name": "default", + "table_name": "test_ch_table", + "split_field": "id", + "split_config": "{\"name\": \"id\", \"lower_bound\": 0, \"upper_bound\": \"10000\", \"split_num\": 3}", + "sql_filter": "( id % 2 == 0 )", + "columns": [ + { + "name": "id", + "type": "int64" + }, + { + "name": "int_type", + "type": "int32" + }, + { + "name": "double_type", + "type": "float64" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + }, + { + "name": "int_128", + "type": "int128" + }, + { + "name": "int_256", + "type": "int256" + }, + { + "name": "decimal_type", + "type": "decimal" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.clickhouse.sink.ClickhouseSink", + "jdbc_url": "jdbc:clickhouse://localhost:8123", + "db_name": "default", + "table_name": "test_ch_table_sink", + "user_name": "12345677", + "password": "test_001", + "write_mode": "overwrite", + "writer_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "int64", + "nullable": true + }, + { + "name": "int_type", + "type": "int32", + "nullable": true + }, + { + "name": "double_type", + "type": "float64", + "nullable": true + }, + { + "name": "string_type", + "type": "string", + "nullable": true + }, + { + "name": "p_date", + "type": "date", + "nullable": true + }, + { + "name": "int_128", + "type": "int128", + "nullable": false + }, + { + "name": "int_256", + "type": "int256", + "nullable": true + }, + { + "name": "decimal_type", + "type": "decimal", + "nullable": false + } + ] + } + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_print.json b/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_print.json index b167c35c6..61a62c6c4 100644 --- a/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_print.json +++ b/bitsail-connectors/connector-clickhouse/src/test/resources/clickhouse_to_print.json @@ -42,6 +42,10 @@ { "name": "int_256", "type": "int256" + }, + { + "name": "decimal_type", + "type": "decimal" } ] }, diff --git a/bitsail-connectors/connector-clickhouse/src/test/resources/example_data.csv b/bitsail-connectors/connector-clickhouse/src/test/resources/example_data.csv index 3d6a6f964..2ab3fa435 100644 --- a/bitsail-connectors/connector-clickhouse/src/test/resources/example_data.csv +++ b/bitsail-connectors/connector-clickhouse/src/test/resources/example_data.csv @@ -1,50 +1,50 @@ -1,100001,100.001,text_0001,2020-01-01 -2,100002,100.002,text_0002,2020-01-02 -3,100003,100.003,text_0003,2020-01-03 -4,100004,100.004,text_0004,2020-01-04 -5,100005,100.005,text_0005,2020-01-05 -6,100006,100.006,text_0006,2020-01-06 -7,100007,100.007,text_0007,2020-01-07 -8,100008,100.008,text_0008,2020-01-08 -9,100009,100.009,text_0009,2020-01-09 -10,100010,100.01,text_0010,2020-01-10 -11,100011,100.011,text_0011,2020-01-11 -12,100012,100.012,text_0012,2020-01-12 -13,100013,100.013,text_0013,2020-01-13 -14,100014,100.014,text_0014,2020-01-14 -15,100015,100.015,text_0015,2020-01-15 -16,100016,100.016,text_0016,2020-01-16 -17,100017,100.017,text_0017,2020-01-17 -18,100018,100.018,text_0018,2020-01-18 -19,100019,100.019,text_0019,2020-01-19 -20,100020,100.02,text_0020,2020-01-20 -21,100021,100.021,text_0021,2020-01-21 -22,100022,100.022,text_0022,2020-01-22 -23,100023,100.023,text_0023,2020-01-23 -24,100024,100.024,text_0024,2020-01-24 -25,100025,100.025,text_0025,2020-01-25 -26,100026,100.026,text_0026,2020-01-26 -27,100027,100.027,text_0027,2020-01-27 -28,100028,100.028,text_0028,2020-01-28 -29,100029,100.029,text_0029,2020-01-29 -30,100030,100.03,text_0030,2020-01-30 -31,100031,100.031,text_0031,2020-01-31 -32,100032,100.032,text_0032,2020-02-01 -33,100033,100.033,text_0033,2020-02-02 -34,100034,100.034,text_0034,2020-02-03 -35,100035,100.035,text_0035,2020-02-04 -36,100036,100.036,text_0036,2020-02-05 -37,100037,100.037,text_0037,2020-02-06 -38,100038,100.038,text_0038,2020-02-07 -39,100039,100.039,text_0039,2020-02-08 -40,100040,100.04,text_0040,2020-02-09 -41,100041,100.041,text_0041,2020-02-10 -42,100042,100.042,text_0042,2020-02-11 -43,100043,100.043,text_0043,2020-02-12 -44,100044,100.044,text_0044,2020-02-13 -45,100045,100.045,text_0045,2020-02-14 -46,100046,100.046,text_0046,2020-02-15 -47,100047,100.047,text_0047,2020-02-16 -48,100048,100.048,text_0048,2020-02-17 -49,100049,100.049,text_0049,2020-02-18 -50,100050,100.05,text_0050,2020-02-19 \ No newline at end of file +1,100001,100.001,text_0001,2020-01-01,100.001 +2,100002,100.002,text_0002,2020-01-02,100.002 +3,100003,100.003,text_0003,2020-01-03,100.003 +4,100004,100.004,text_0004,2020-01-04,100.004 +5,100005,100.005,text_0005,2020-01-05,100.005 +6,100006,100.006,text_0006,2020-01-06,100.006 +7,100007,100.007,text_0007,2020-01-07,100.007 +8,100008,100.008,text_0008,2020-01-08,100.008 +9,100009,100.009,text_0009,2020-01-09,100.009 +10,100010,100.01,text_0010,2020-01-10,100.01 +11,100011,100.011,text_0011,2020-01-11,100.011 +12,100012,100.012,text_0012,2020-01-12,100.012 +13,100013,100.013,text_0013,2020-01-13,100.013 +14,100014,100.014,text_0014,2020-01-14,100.014 +15,100015,100.015,text_0015,2020-01-15,100.015 +16,100016,100.016,text_0016,2020-01-16,100.016 +17,100017,100.017,text_0017,2020-01-17,100.017 +18,100018,100.018,text_0018,2020-01-18,100.018 +19,100019,100.019,text_0019,2020-01-19,100.019 +20,100020,100.02,text_0020,2020-01-20,100.02 +21,100021,100.021,text_0021,2020-01-21,100.021 +22,100022,100.022,text_0022,2020-01-22,100.022 +23,100023,100.023,text_0023,2020-01-23,100.023 +24,100024,100.024,text_0024,2020-01-24,100.024 +25,100025,100.025,text_0025,2020-01-25,100.025 +26,100026,100.026,text_0026,2020-01-26,100.026 +27,100027,100.027,text_0027,2020-01-27,100.027 +28,100028,100.028,text_0028,2020-01-28,100.028 +29,100029,100.029,text_0029,2020-01-29,100.029 +30,100030,100.03,text_0030,2020-01-30,100.03 +31,100031,100.031,text_0031,2020-01-31,100.031 +32,100032,100.032,text_0032,2020-02-01,100.032 +33,100033,100.033,text_0033,2020-02-02,100.033 +34,100034,100.034,text_0034,2020-02-03,100.034 +35,100035,100.035,text_0035,2020-02-04,100.035 +36,100036,100.036,text_0036,2020-02-05,100.036 +37,100037,100.037,text_0037,2020-02-06,100.037 +38,100038,100.038,text_0038,2020-02-07,100.038 +39,100039,100.039,text_0039,2020-02-08,100.039 +40,100040,100.04,text_0040,2020-02-09,100.04 +41,100041,100.041,text_0041,2020-02-10,100.041 +42,100042,100.042,text_0042,2020-02-11,100.042 +43,100043,100.043,text_0043,2020-02-12,100.043 +44,100044,100.044,text_0044,2020-02-13,100.044 +45,100045,100.045,text_0045,2020-02-14,100.045 +46,100046,100.046,text_0046,2020-02-15,100.046 +47,100047,100.047,text_0047,2020-02-16,100.047 +48,100048,100.048,text_0048,2020-02-17,100.048 +49,100049,100.049,text_0049,2020-02-18,100.049 +50,100050,100.05,text_0050,2020-02-19,100.05 \ No newline at end of file From b72f8d12d10ea3003c7cd93f0a89e7bcab94cd81 Mon Sep 17 00:00:00 2001 From: kyle-hawk Date: Mon, 16 Jan 2023 16:47:12 +0800 Subject: [PATCH 2/2] #109 resolve License header problem --- .../option/ClickhouseWriterOptions.java | 25 ++++++++++--------- .../clickhouse/sink/ClickhouseSink.java | 16 ++++++++++++ .../clickhouse/sink/ClickhouseWriter.java | 16 ++++++++++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java index 2fbb1852b..8af240eab 100644 --- a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/option/ClickhouseWriterOptions.java @@ -1,16 +1,17 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.bytedance.bitsail.connector.clickhouse.option; diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java index 82b97e68e..ac996870f 100644 --- a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseSink.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.bytedance.bitsail.connector.clickhouse.sink; import com.bytedance.bitsail.base.connector.writer.v1.Sink; diff --git a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java index 9c72affbf..5a4567b2c 100644 --- a/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java +++ b/bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/sink/ClickhouseWriter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.bytedance.bitsail.connector.clickhouse.sink; import com.bytedance.bitsail.base.connector.writer.v1.Writer;