Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,6 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception
int packetId = message.variableHeader().packetId();
boolean qos2PublishAlreadyReceived = state.getPubRec().contains(packetId);
if (qos < 2 || !qos2PublishAlreadyReceived) {
if (qos == 2 && !internal)
state.getPubRec().add(packetId);

Transaction tx = session.getServerSession().newTransaction();
try {
AddressInfo addressInfo = session.getServer().getAddressInfo(address);
Expand All @@ -223,6 +220,10 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception
}
session.getServerSession().send(tx, serverMessage, true, senderName, false);

if (qos == 2 && !internal) {
state.getPubRec().add(packetId);
}

if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
Expand All @@ -42,6 +43,10 @@ public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport {
String user1 = "user1";
String password1 = "password1";

final String noSendUser = "noSendUser";
final String noSendPassword = "noSendPassword";
final String noSendRole = "noSendRole";

@Override
protected void configureBrokerSecurity(ActiveMQServer server) {
super.configureBrokerSecurity(server);
Expand All @@ -51,10 +56,14 @@ protected void configureBrokerSecurity(ActiveMQServer server) {
securityManager.getConfiguration().addUser(user1, password1);
securityManager.getConfiguration().addRole(user1, "addressOnly");

securityManager.getConfiguration().addUser(noSendUser, noSendPassword);
securityManager.getConfiguration().addRole(noSendUser, noSendRole);

// Configure roles
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
Set<Role> value = new HashSet<>();
value.add(new Role("addressOnly", true, true, true, true, false, false, false, false, true, true, false, false));
value.add(new Role(noSendRole, false, true, true, true, true, true, true, true, true, true, true, true));

securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(), server.getConfiguration().getWildcardConfiguration()), value);
}
Expand Down Expand Up @@ -108,6 +117,27 @@ public void deliveryComplete(IMqttDeliveryToken token) {
assertFalse(failed[0]);
}

@Test
@Timeout(60)
public void testSendQoS2UnauthorizedNotStorePublish() throws Exception {
final String clientID = "clientID";

MqttClient producer = createPahoClient(clientID);
MqttConnectOptions conOpt = new MqttConnectOptions();
conOpt.setCleanSession(false);
conOpt.setUserName(noSendUser);
conOpt.setPassword(noSendPassword.toCharArray());

producer.connect(conOpt);
try {
producer.publish(getQueueName(), "hello".getBytes(), 2, false);
} catch (MqttException e) {
// ignore
}
assertEquals(0, getSessions().get(clientID).getPubRec().size());
producer.close();
}

private MqttClient createPahoClient(String clientId) throws MqttException {
return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
}
Expand Down