Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.constants;

public class OptionalProducerConfig {
public static final String INSTANCE_NAME = "instanceName";
public static final String VIP_CHANNEL = "vipChannelEnabled";
public static final String DEFAULT_TOPIC_QUEUE_NUMS = "defaultTopicQueueNums";
public static final String COMPRESS_SG_BODY_OVER = "compressMsgBodyOverHowmuch";
public static final String HEARTBEAT_BROKER_INTERVAL = "heartbeatBrokerInterval";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.constants;

public class RocketMQConstants {

public static final String CONNECTOR_NAME = "rocketmq";

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public enum RocketMQErrorCode implements ErrorCode {

CONSUMER_CREATE_FAILED("RocketMQ-1", "RocketMQ Consumer create failed."),
CONSUMER_FETCH_OFFSET_FAILED("RocketMQ-2", "RocketMQ Consumer fetch offset failed."),
CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed.");
CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."),
REQUIRED_VALUE("RocketMQ-4", "You missed parameter which is required, please check your configuration."),
UNSUPPORTED_FORMAT("RocketMQ-5", "Unsupported output format.");

public String code;
public String description;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.format;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

public class JsonRocketMQSerializationSchema implements RocketMQSerializationSchema {
private static final Logger LOG = LoggerFactory.getLogger(JsonRocketMQSerializationSchema.class);

private static final long serialVersionUID = 3L;

private final List<Integer> partitionKeyIndices;
private final List<Integer> keyIndices;
private final transient JsonRowSerializationSchema rowSerializationSchema;

public JsonRocketMQSerializationSchema(BitSailConfiguration bitSailConfiguration, RowTypeInfo rowTypeInfo,
List<Integer> partitionKeyIndices, List<Integer> keyIndices) {
this.partitionKeyIndices = partitionKeyIndices;
this.keyIndices = keyIndices;
this.rowSerializationSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo);
}

@Override
public byte[] serializeKey(Row row) {
if (keyIndices != null) {
String key = this.keyIndices.stream()
.map(i -> {
Object keyField = row.getField(i);
if (keyField != null) {
return keyField.toString();
}
LOG.warn("Found null key in row: [{}]", row);
return null;
})
.filter(StringUtils::isNotEmpty)
.collect(Collectors.joining());
return key.getBytes(StandardCharsets.UTF_8);
} else {
return null;
}
}

@Override
public byte[] serializeValue(Row row) {
return rowSerializationSchema.serialize(row);
}

@Override
public String getPartitionKey(Row row) {
if (partitionKeyIndices != null) {
return this.partitionKeyIndices.stream()
.map(i -> {
Object partitionField = row.getField(i);
if (partitionField != null) {
return partitionField.toString();
}
LOG.warn("Found null key in row: [{}]", row);
return null;
})
.filter(StringUtils::isNotEmpty)
.collect(Collectors.joining());
} else {
return null;
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.format;

import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode;
import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat;

import java.util.List;

public class RocketMQSerializationFactory {

RowTypeInfo rowTypeInfo;
List<Integer> partitionIndices;
List<Integer> keyIndices;

public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List<Integer> partitionIndices, List<Integer> keyIndices) {
this.rowTypeInfo = rowTypeInfo;
this.partitionIndices = partitionIndices;
this.keyIndices = keyIndices;
}

public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) {
if (format == RocketMQSinkFormat.JSON) {
return new JsonRocketMQSerializationSchema(bitSailConfiguration, rowTypeInfo, partitionIndices, keyIndices);
}
throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT,
"unsupported sink format: " + format);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.format;

import com.bytedance.bitsail.common.row.Row;

public interface RocketMQSerializationSchema {

byte[] serializeKey(Row row);

byte[] serializeValue(Row row);

default String getPartitionKey(Row row) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.option;

import com.bytedance.bitsail.common.annotation.Essential;
import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.WriterOptions;

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX;

public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions {

@Essential
ConfigOption<String> NAME_SERVER_ADDRESS =
key(WRITER_PREFIX + "name_server_address")
.noDefaultValue(String.class);

ConfigOption<String> PRODUCER_GROUP =
key(WRITER_PREFIX + "producer_group")
.noDefaultValue(String.class);

@Essential
ConfigOption<String> TOPIC =
key(WRITER_PREFIX + "topic")
.noDefaultValue(String.class);

ConfigOption<String> TAG =
key(WRITER_PREFIX + "tag")
.noDefaultValue(String.class);

ConfigOption<Boolean> ENABLE_BATCH_FLUSH =
key(WRITER_PREFIX + "enable_batch_flush")
.defaultValue(true);

ConfigOption<Integer> BATCH_SIZE =
key(WRITER_PREFIX + "batch_size")
.defaultValue(100);

/**
* when encounter errors while sending:<br/>
* true: log the error<br/>
* false: throw exceptions
*/
ConfigOption<Boolean> LOG_FAILURES_ONLY =
key(WRITER_PREFIX + "log_failures_only")
.defaultValue(false);

ConfigOption<Boolean> ENABLE_SYNC_SEND =
key(WRITER_PREFIX + "enable_sync_send")
.defaultValue(false);

ConfigOption<String> ACCESS_KEY =
key(WRITER_PREFIX + "access_key")
.noDefaultValue(String.class);

ConfigOption<String> SECRET_KEY =
key(WRITER_PREFIX + "secret_key")
.noDefaultValue(String.class);

ConfigOption<Integer> SEND_FAILURE_RETRY_TIMES =
key(WRITER_PREFIX + "send_failure_retry_times")
.defaultValue(3);

ConfigOption<Integer> SEND_MESSAGE_TIMEOUT =
key(WRITER_PREFIX + "send_message_timeout_ms")
.defaultValue(3000);

ConfigOption<Integer> MAX_MESSAGE_SIZE =
key(WRITER_PREFIX + "max_message_size_bytes")
.defaultValue(4194304);

ConfigOption<String> KEY_FIELDS =
key(WRITER_PREFIX + "key")
.noDefaultValue(String.class);

ConfigOption<String> PARTITION_FIELDS =
key(WRITER_PREFIX + "partition_fields")
.noDefaultValue(String.class);

ConfigOption<String> FORMAT =
key(WRITER_PREFIX + "format")
.defaultValue("json");

ConfigOption<Integer> DEFAULT_TOPIC_QUEUE_NUMS =
key(WRITER_PREFIX + "default_topic_queue_nums")
.defaultValue(4);

ConfigOption<Integer> COMPRESS_MSG_BODY_SIZE =
key(WRITER_PREFIX + "compress_msg_body_over_how_much")
.defaultValue(4096);

ConfigOption<Integer> HEART_BEAT_BROKER_INTERVAL =
key(WRITER_PREFIX + "heart_beat_broker_interval")
.defaultValue(30000);

ConfigOption<Boolean> VIP_CHANNEL_ENABLED =
key(WRITER_PREFIX + "vip_channel_enabled")
.defaultValue(false);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.sink;

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class HashQueueSelector implements MessageQueueSelector {

private int nullKeyCount;

public HashQueueSelector() {
super();
nullKeyCount = 0;
}

@Override
public MessageQueue select(List<MessageQueue> mqList, Message message, Object partitionKeys) {
int queueId;

if (partitionKeys != null) {
queueId = partitionKeys.hashCode() % mqList.size();
} else {
queueId = nullKeyCount % mqList.size();
nullKeyCount = (nullKeyCount + 1) % mqList.size();
}

return mqList.get(queueId);
}
}

Loading