This repository was archived by the owner on Jan 12, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 334
[BitSail][Connector] Migrate rocketmq Sink connector to V1 interface #457
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
c074de2
init RocketMQ sink v1
beyond-up e212788
modify RocketMQ writer
beyond-up 367058e
Merge branch 'bytedance:master' into rocketmq_sink_v1
beyond-up 617699f
add test for RocketMQSink
beyond-up eadce0b
Merge branch 'rocketmq_sink_v1' of github.com:beyond-up/bitsail into …
beyond-up 7c3d27b
add header
beyond-up c07928a
add unit test for HashQueueSelector and optimize producer config and …
beyond-up 52d61f5
Merge branch 'bytedance:master' into rocketmq_sink_v1
beyond-up 850fb2b
optimize rocketmq producer config
beyond-up 144f3f7
optimize rocketmq write config
beyond-up 4ee86fa
Merge branch 'rocketmq_sink_v1' of github.com:beyond-up/bitsail into …
beyond-up File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
25 changes: 25 additions & 0 deletions
25
.../main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.constants; | ||
|
|
||
| public class OptionalProducerConfig { | ||
| public static final String INSTANCE_NAME = "instanceName"; | ||
| public static final String VIP_CHANNEL = "vipChannelEnabled"; | ||
| public static final String DEFAULT_TOPIC_QUEUE_NUMS = "defaultTopicQueueNums"; | ||
| public static final String COMPRESS_SG_BODY_OVER = "compressMsgBodyOverHowmuch"; | ||
| public static final String HEARTBEAT_BROKER_INTERVAL = "heartbeatBrokerInterval"; | ||
| } | ||
23 changes: 23 additions & 0 deletions
23
...q/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.constants; | ||
|
|
||
| public class RocketMQConstants { | ||
|
|
||
| public static final String CONNECTOR_NAME = "rocketmq"; | ||
|
|
||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
...java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.format; | ||
|
|
||
| import com.bytedance.bitsail.common.configuration.BitSailConfiguration; | ||
| import com.bytedance.bitsail.common.row.Row; | ||
| import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; | ||
| import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class JsonRocketMQSerializationSchema implements RocketMQSerializationSchema { | ||
| private static final Logger LOG = LoggerFactory.getLogger(JsonRocketMQSerializationSchema.class); | ||
|
|
||
| private static final long serialVersionUID = 3L; | ||
|
|
||
| private final List<Integer> partitionKeyIndices; | ||
| private final List<Integer> keyIndices; | ||
| private final transient JsonRowSerializationSchema rowSerializationSchema; | ||
|
|
||
| public JsonRocketMQSerializationSchema(BitSailConfiguration bitSailConfiguration, RowTypeInfo rowTypeInfo, | ||
| List<Integer> partitionKeyIndices, List<Integer> keyIndices) { | ||
| this.partitionKeyIndices = partitionKeyIndices; | ||
| this.keyIndices = keyIndices; | ||
| this.rowSerializationSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo); | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] serializeKey(Row row) { | ||
| if (keyIndices != null) { | ||
| String key = this.keyIndices.stream() | ||
| .map(i -> { | ||
| Object keyField = row.getField(i); | ||
| if (keyField != null) { | ||
| return keyField.toString(); | ||
| } | ||
| LOG.warn("Found null key in row: [{}]", row); | ||
| return null; | ||
| }) | ||
| .filter(StringUtils::isNotEmpty) | ||
| .collect(Collectors.joining()); | ||
| return key.getBytes(StandardCharsets.UTF_8); | ||
| } else { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] serializeValue(Row row) { | ||
| return rowSerializationSchema.serialize(row); | ||
| } | ||
|
|
||
| @Override | ||
| public String getPartitionKey(Row row) { | ||
| if (partitionKeyIndices != null) { | ||
| return this.partitionKeyIndices.stream() | ||
| .map(i -> { | ||
| Object partitionField = row.getField(i); | ||
| if (partitionField != null) { | ||
| return partitionField.toString(); | ||
| } | ||
| LOG.warn("Found null key in row: [{}]", row); | ||
| return null; | ||
| }) | ||
| .filter(StringUtils::isNotEmpty) | ||
| .collect(Collectors.joining()); | ||
| } else { | ||
| return null; | ||
| } | ||
| } | ||
| } | ||
|
|
46 changes: 46 additions & 0 deletions
46
...in/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.format; | ||
|
|
||
| import com.bytedance.bitsail.common.BitSailException; | ||
| import com.bytedance.bitsail.common.configuration.BitSailConfiguration; | ||
| import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; | ||
| import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; | ||
| import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| public class RocketMQSerializationFactory { | ||
|
|
||
| RowTypeInfo rowTypeInfo; | ||
| List<Integer> partitionIndices; | ||
| List<Integer> keyIndices; | ||
|
|
||
| public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List<Integer> partitionIndices, List<Integer> keyIndices) { | ||
| this.rowTypeInfo = rowTypeInfo; | ||
| this.partitionIndices = partitionIndices; | ||
| this.keyIndices = keyIndices; | ||
| } | ||
|
|
||
| public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) { | ||
| if (format == RocketMQSinkFormat.JSON) { | ||
| return new JsonRocketMQSerializationSchema(bitSailConfiguration, rowTypeInfo, partitionIndices, keyIndices); | ||
| } | ||
| throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT, | ||
| "unsupported sink format: " + format); | ||
| } | ||
| } |
30 changes: 30 additions & 0 deletions
30
...ain/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.format; | ||
|
|
||
| import com.bytedance.bitsail.common.row.Row; | ||
|
|
||
| public interface RocketMQSerializationSchema { | ||
|
|
||
| byte[] serializeKey(Row row); | ||
|
|
||
| byte[] serializeValue(Row row); | ||
|
|
||
| default String getPartitionKey(Row row) { | ||
| return null; | ||
| } | ||
| } |
115 changes: 115 additions & 0 deletions
115
.../src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.option; | ||
|
|
||
| import com.bytedance.bitsail.common.annotation.Essential; | ||
| import com.bytedance.bitsail.common.option.ConfigOption; | ||
| import com.bytedance.bitsail.common.option.WriterOptions; | ||
|
|
||
| import static com.bytedance.bitsail.common.option.ConfigOptions.key; | ||
| import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX; | ||
|
|
||
| public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions { | ||
|
|
||
| @Essential | ||
| ConfigOption<String> NAME_SERVER_ADDRESS = | ||
| key(WRITER_PREFIX + "name_server_address") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<String> PRODUCER_GROUP = | ||
| key(WRITER_PREFIX + "producer_group") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| @Essential | ||
| ConfigOption<String> TOPIC = | ||
| key(WRITER_PREFIX + "topic") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<String> TAG = | ||
| key(WRITER_PREFIX + "tag") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<Boolean> ENABLE_BATCH_FLUSH = | ||
| key(WRITER_PREFIX + "enable_batch_flush") | ||
| .defaultValue(true); | ||
|
|
||
| ConfigOption<Integer> BATCH_SIZE = | ||
| key(WRITER_PREFIX + "batch_size") | ||
| .defaultValue(100); | ||
|
|
||
| /** | ||
| * when encounter errors while sending:<br/> | ||
| * true: log the error<br/> | ||
| * false: throw exceptions | ||
| */ | ||
| ConfigOption<Boolean> LOG_FAILURES_ONLY = | ||
| key(WRITER_PREFIX + "log_failures_only") | ||
| .defaultValue(false); | ||
|
|
||
| ConfigOption<Boolean> ENABLE_SYNC_SEND = | ||
| key(WRITER_PREFIX + "enable_sync_send") | ||
| .defaultValue(false); | ||
|
|
||
| ConfigOption<String> ACCESS_KEY = | ||
| key(WRITER_PREFIX + "access_key") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<String> SECRET_KEY = | ||
| key(WRITER_PREFIX + "secret_key") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<Integer> SEND_FAILURE_RETRY_TIMES = | ||
| key(WRITER_PREFIX + "send_failure_retry_times") | ||
| .defaultValue(3); | ||
|
|
||
| ConfigOption<Integer> SEND_MESSAGE_TIMEOUT = | ||
| key(WRITER_PREFIX + "send_message_timeout_ms") | ||
| .defaultValue(3000); | ||
|
|
||
| ConfigOption<Integer> MAX_MESSAGE_SIZE = | ||
| key(WRITER_PREFIX + "max_message_size_bytes") | ||
| .defaultValue(4194304); | ||
|
|
||
| ConfigOption<String> KEY_FIELDS = | ||
| key(WRITER_PREFIX + "key") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<String> PARTITION_FIELDS = | ||
| key(WRITER_PREFIX + "partition_fields") | ||
| .noDefaultValue(String.class); | ||
|
|
||
| ConfigOption<String> FORMAT = | ||
| key(WRITER_PREFIX + "format") | ||
| .defaultValue("json"); | ||
|
|
||
| ConfigOption<Integer> DEFAULT_TOPIC_QUEUE_NUMS = | ||
| key(WRITER_PREFIX + "default_topic_queue_nums") | ||
| .defaultValue(4); | ||
|
|
||
| ConfigOption<Integer> COMPRESS_MSG_BODY_SIZE = | ||
| key(WRITER_PREFIX + "compress_msg_body_over_how_much") | ||
| .defaultValue(4096); | ||
|
|
||
| ConfigOption<Integer> HEART_BEAT_BROKER_INTERVAL = | ||
| key(WRITER_PREFIX + "heart_beat_broker_interval") | ||
| .defaultValue(30000); | ||
|
|
||
| ConfigOption<Boolean> VIP_CHANNEL_ENABLED = | ||
| key(WRITER_PREFIX + "vip_channel_enabled") | ||
| .defaultValue(false); | ||
|
|
||
| } |
48 changes: 48 additions & 0 deletions
48
...cketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.bytedance.bitsail.connector.rocketmq.sink; | ||
|
|
||
| import org.apache.rocketmq.client.producer.MessageQueueSelector; | ||
| import org.apache.rocketmq.common.message.Message; | ||
| import org.apache.rocketmq.common.message.MessageQueue; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| public class HashQueueSelector implements MessageQueueSelector { | ||
beyond-up marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private int nullKeyCount; | ||
|
|
||
| public HashQueueSelector() { | ||
| super(); | ||
| nullKeyCount = 0; | ||
| } | ||
|
|
||
| @Override | ||
| public MessageQueue select(List<MessageQueue> mqList, Message message, Object partitionKeys) { | ||
| int queueId; | ||
|
|
||
| if (partitionKeys != null) { | ||
| queueId = partitionKeys.hashCode() % mqList.size(); | ||
| } else { | ||
| queueId = nullKeyCount % mqList.size(); | ||
| nullKeyCount = (nullKeyCount + 1) % mqList.size(); | ||
| } | ||
|
|
||
| return mqList.get(queueId); | ||
| } | ||
| } | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.