diff --git a/rocketmq-v5-client-spring-boot-samples/pom.xml b/rocketmq-v5-client-spring-boot-samples/pom.xml index ae931911..d8559c9a 100644 --- a/rocketmq-v5-client-spring-boot-samples/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples pom - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-spring-boot-samples rocketmq-v5-client-spring-boot-samples @@ -35,7 +35,9 @@ rocketmq-v5-client-producer-simple-demo rocketmq-v5-client-consumer-simple-demo rocketmq-v5-client-consumer-push-simple-demo - + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + 1.8 diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml index a73027b6..cc600607 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consume-acl-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml index 0b2c891b..2b5d46a8 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consume-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml new file mode 100644 index 00000000..af13654f --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-v5-client-spring-boot-samples + 2.3.6-SNAPSHOT + + + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + + + 8 + 8 + UTF-8 + + + \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java new file mode 100644 index 00000000..35b4b579 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.springboot; + +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import javax.annotation.Resource; +import java.time.Duration; +import java.util.List; + +@SpringBootApplication +public class V5SimpleConsumerConsumerApplication implements CommandLineRunner { + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + public static void main(String[] args) { + SpringApplication.run(V5SimpleConsumerConsumerApplication.class, args); + } + + @Override + public void run(String... args) throws Exception { + for (int i = 0; i < 10; i++) { + List messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(60)); + System.out.println(messageList); + } + } +} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties new file mode 100644 index 00000000..327970f8 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rocketmq.simple-consumer.endpoints=localhost:8081 +rocketmq.simple-consumer.consumer-group=localhost:8081 +rocketmq.simple-consumer.subscription-expressions.demo-topic.tag=tagA +rocketmq.simple-consumer.subscription-expressions.demo-topic.filter-expression-type=tag +rocketmq.simple-consumer.subscription-expressions.demo-topic2.tag=tagB +rocketmq.simple-consumer.subscription-expressions.demo-topic2.filter-expression-type=tag +#rocketmq.simple-consumer.access-key= +#rocketmq.simple-consumer.secret-key= + diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml index 1a20a74c..b67eeead 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consumer-push-simple-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml index c997133f..8008c7ec 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consumer-simple-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml index 22b190cd..cd97b8bf 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-acl-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml index 6c23b307..a4e155a5 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml index b6769604..e72039fb 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-simple-demo diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 346311f2..fde2d72d 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -46,6 +46,7 @@ import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Objects; @Configuration @@ -105,7 +106,6 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration); @@ -116,9 +116,16 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr if (StringUtils.hasLength(consumerGroup)) { simpleConsumerBuilder.setConsumerGroup(consumerGroup); } + // Set the subscription for the consumer. - if (Objects.nonNull(filterExpression)) { - simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); + if (simpleConsumer.getSubscriptionExpressions().isEmpty()) { + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); + if (Objects.nonNull(filterExpression)) { + simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); + } + } else { + Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getSubscriptionExpressions()); + simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions); } return simpleConsumerBuilder; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..6e5be06c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -18,6 +18,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.Map; + @SuppressWarnings("WeakerAccess") @ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { @@ -211,6 +213,11 @@ public static class SimpleConsumer { private String namespace = ""; + /** + * key is topic + */ + private Map subscriptionExpressions; + public String getAccessKey() { return accessKey; } @@ -299,6 +306,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public Map getSubscriptionExpressions() { + return subscriptionExpressions; + } + + public void setSubscriptionExpressions(Map subscriptionExpressions) { + this.subscriptionExpressions = subscriptionExpressions; + } + @Override public String toString() { return "SimpleConsumer{" + @@ -315,4 +330,25 @@ public String toString() { } } + public static class FilterExpression { + private String tag; + + private String filterExpressionType; + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getFilterExpressionType() { + return filterExpressionType; + } + + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; + } + } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 55f3948f..187f399c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -34,6 +34,8 @@ import java.nio.charset.Charset; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class RocketMQUtil { @@ -184,4 +186,14 @@ public static FilterExpression createFilterExpression(String tag, String type) { FilterExpression filterExpression = new FilterExpression(tag, filterExpressionType); return filterExpression; } + + public static Map createSubscriptionExpressions(Map map) { + Map subscriptionExpressions = new HashMap<>(); + map.forEach((topic, expression) -> { + FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92; + FilterExpression filterExpression = new FilterExpression(expression.getTag(), filterExpressionType); + subscriptionExpressions.put(topic, filterExpression); + }); + return subscriptionExpressions; + } }