diff --git a/rocketmq-v5-client-spring-boot-samples/pom.xml b/rocketmq-v5-client-spring-boot-samples/pom.xml
index ae931911..d8559c9a 100644
--- a/rocketmq-v5-client-spring-boot-samples/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
pom
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-spring-boot-samples
rocketmq-v5-client-spring-boot-samples
@@ -35,7 +35,9 @@
rocketmq-v5-client-producer-simple-demo
rocketmq-v5-client-consumer-simple-demo
rocketmq-v5-client-consumer-push-simple-demo
-
+ rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo
+ rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo
+
1.8
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
index a73027b6..cc600607 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consume-acl-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
index 0b2c891b..2b5d46a8 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consume-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml
new file mode 100644
index 00000000..af13654f
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ 2.3.6-SNAPSHOT
+
+
+ rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo
+
+
+ 8
+ 8
+ UTF-8
+
+
+
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java
new file mode 100644
index 00000000..35b4b579
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.springboot;
+
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import javax.annotation.Resource;
+import java.time.Duration;
+import java.util.List;
+
+@SpringBootApplication
+public class V5SimpleConsumerConsumerApplication implements CommandLineRunner {
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(V5SimpleConsumerConsumerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ List messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(60));
+ System.out.println(messageList);
+ }
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties
new file mode 100644
index 00000000..327970f8
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq.simple-consumer.endpoints=localhost:8081
+rocketmq.simple-consumer.consumer-group=localhost:8081
+rocketmq.simple-consumer.subscription-expressions.demo-topic.tag=tagA
+rocketmq.simple-consumer.subscription-expressions.demo-topic.filter-expression-type=tag
+rocketmq.simple-consumer.subscription-expressions.demo-topic2.tag=tagB
+rocketmq.simple-consumer.subscription-expressions.demo-topic2.filter-expression-type=tag
+#rocketmq.simple-consumer.access-key=
+#rocketmq.simple-consumer.secret-key=
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml
index 1a20a74c..b67eeead 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consumer-push-simple-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml
index c997133f..8008c7ec 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consumer-simple-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
index 22b190cd..cd97b8bf 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-producer-acl-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
index 6c23b307..a4e155a5 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-producer-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml
index b6769604..e72039fb 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-producer-simple-demo
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
index 346311f2..fde2d72d 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
@@ -46,6 +46,7 @@
import java.time.Duration;
import java.util.Collections;
+import java.util.Map;
import java.util.Objects;
@Configuration
@@ -105,7 +106,6 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr
RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer();
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String consumerGroup = simpleConsumer.getConsumerGroup();
- FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType());
ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer);
SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration);
@@ -116,9 +116,16 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr
if (StringUtils.hasLength(consumerGroup)) {
simpleConsumerBuilder.setConsumerGroup(consumerGroup);
}
+
// Set the subscription for the consumer.
- if (Objects.nonNull(filterExpression)) {
- simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression));
+ if (simpleConsumer.getSubscriptionExpressions().isEmpty()) {
+ FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType());
+ if (Objects.nonNull(filterExpression)) {
+ simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression));
+ }
+ } else {
+ Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getSubscriptionExpressions());
+ simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions);
}
return simpleConsumerBuilder;
}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
index 8f3d4941..6e5be06c 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
@@ -18,6 +18,8 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.util.Map;
+
@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
@@ -211,6 +213,11 @@ public static class SimpleConsumer {
private String namespace = "";
+ /**
+ * key is topic
+ */
+ private Map subscriptionExpressions;
+
public String getAccessKey() {
return accessKey;
}
@@ -299,6 +306,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}
+ public Map getSubscriptionExpressions() {
+ return subscriptionExpressions;
+ }
+
+ public void setSubscriptionExpressions(Map subscriptionExpressions) {
+ this.subscriptionExpressions = subscriptionExpressions;
+ }
+
@Override
public String toString() {
return "SimpleConsumer{" +
@@ -315,4 +330,25 @@ public String toString() {
}
}
+ public static class FilterExpression {
+ private String tag;
+
+ private String filterExpressionType;
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getFilterExpressionType() {
+ return filterExpressionType;
+ }
+
+ public void setFilterExpressionType(String filterExpressionType) {
+ this.filterExpressionType = filterExpressionType;
+ }
+ }
}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
index 55f3948f..187f399c 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
@@ -34,6 +34,8 @@
import java.nio.charset.Charset;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
public class RocketMQUtil {
@@ -184,4 +186,14 @@ public static FilterExpression createFilterExpression(String tag, String type) {
FilterExpression filterExpression = new FilterExpression(tag, filterExpressionType);
return filterExpression;
}
+
+ public static Map createSubscriptionExpressions(Map map) {
+ Map subscriptionExpressions = new HashMap<>();
+ map.forEach((topic, expression) -> {
+ FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92;
+ FilterExpression filterExpression = new FilterExpression(expression.getTag(), filterExpressionType);
+ subscriptionExpressions.put(topic, filterExpression);
+ });
+ return subscriptionExpressions;
+ }
}