diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 4f5fec62a56..5f8d8453f82 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -208,9 +208,6 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception int packetId = message.variableHeader().packetId(); boolean qos2PublishAlreadyReceived = state.getPubRec().contains(packetId); if (qos < 2 || !qos2PublishAlreadyReceived) { - if (qos == 2 && !internal) - state.getPubRec().add(packetId); - Transaction tx = session.getServerSession().newTransaction(); try { AddressInfo addressInfo = session.getServer().getAddressInfo(address); @@ -223,6 +220,10 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception } session.getServerSession().send(tx, serverMessage, true, senderName, false); + if (qos == 2 && !internal) { + state.getPubRec().add(packetId); + } + if (message.fixedHeader().isRetain()) { ByteBuf payload = message.payload(); boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java index 62f95466c07..f0122217640 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.mqtt; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; @@ -42,6 +43,10 @@ public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport { String user1 = "user1"; String password1 = "password1"; + final String noSendUser = "noSendUser"; + final String noSendPassword = "noSendPassword"; + final String noSendRole = "noSendRole"; + @Override protected void configureBrokerSecurity(ActiveMQServer server) { super.configureBrokerSecurity(server); @@ -51,10 +56,14 @@ protected void configureBrokerSecurity(ActiveMQServer server) { securityManager.getConfiguration().addUser(user1, password1); securityManager.getConfiguration().addRole(user1, "addressOnly"); + securityManager.getConfiguration().addUser(noSendUser, noSendPassword); + securityManager.getConfiguration().addRole(noSendUser, noSendRole); + // Configure roles HierarchicalRepository> securityRepository = server.getSecurityRepository(); Set value = new HashSet<>(); value.add(new Role("addressOnly", true, true, true, true, false, false, false, false, true, true, false, false)); + value.add(new Role(noSendRole, false, true, true, true, true, true, true, true, true, true, true, true)); securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(), server.getConfiguration().getWildcardConfiguration()), value); } @@ -108,6 +117,27 @@ public void deliveryComplete(IMqttDeliveryToken token) { assertFalse(failed[0]); } + @Test + @Timeout(60) + public void testSendQoS2UnauthorizedNotStorePublish() throws Exception { + final String clientID = "clientID"; + + MqttClient producer = createPahoClient(clientID); + MqttConnectOptions conOpt = new MqttConnectOptions(); + conOpt.setCleanSession(false); + conOpt.setUserName(noSendUser); + conOpt.setPassword(noSendPassword.toCharArray()); + + producer.connect(conOpt); + try { + producer.publish(getQueueName(), "hello".getBytes(), 2, false); + } catch (MqttException e) { + // ignore + } + assertEquals(0, getSessions().get(clientID).getPubRec().size()); + producer.close(); + } + private MqttClient createPahoClient(String clientId) throws MqttException { return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); }