diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 5da027abad..d12f30666f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -119,7 +119,7 @@ public class MQTTPublishManager { int qos = decideQoS(message, consumer); if (qos == 0) { sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos); - session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); + session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID()); } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); @@ -202,7 +202,7 @@ public class MQTTPublishManager { if (ref != null) { Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId); session.getServerSession().send(m, true); - session.getServerSession().acknowledge(ref.getB(), ref.getA()); + session.getServerSession().individualAcknowledge(ref.getB(), ref.getA()); } else { session.getProtocolHandler().sendPubRel(messageId); } @@ -214,7 +214,7 @@ public class MQTTPublishManager { void handlePubComp(int messageId) throws Exception { Pair ref = session.getState().getOutboundStore().publishComplete(messageId); if (ref != null) { - session.getServerSession().acknowledge(managementConsumer.getID(), ref.getA()); + session.getServerSession().individualAcknowledge(managementConsumer.getID(), ref.getA()); } } @@ -249,7 +249,7 @@ public class MQTTPublishManager { try { Pair ref = outboundStore.publishAckd(messageId); if (ref != null) { - session.getServerSession().acknowledge(ref.getB(), ref.getA()); + session.getServerSession().individualAcknowledge(ref.getB(), ref.getA()); } } catch (ActiveMQIllegalStateException e) { log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message"); diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 56a4bc8fdb..87994715c0 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -30,6 +30,7 @@ ${project.basedir}/../.. 4.0.6 4.9.1 + 1.1.0 @@ -167,8 +168,9 @@ mqtt-client - org.eclipse.paho - mqtt-client + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${paho.client.mqttv3.version} io.netty diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.java new file mode 100644 index 0000000000..239da0daae --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt; + +import java.util.LinkedList; + +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; +import org.apache.activemq.artemis.tests.util.Wait; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.jgroups.util.UUID; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class MqttAcknowledgementTest extends MQTTTestSupport { + + private volatile LinkedList messageIds = new LinkedList<>(); + private volatile boolean messageArrived = false; + + private MqttClient subscriber; + private MqttClient sender; + + @After + public void clean() throws MqttException { + messageArrived = false; + messageIds.clear(); + if (subscriber.isConnected()) { + subscriber.disconnect(); + } + if (sender.isConnected()) { + sender.disconnect(); + } + subscriber.close(); + sender.close(); + } + + @Test(timeout = 300000) + public void testAcknowledgementQOS1() throws Exception { + test(1); + } + + @Test(timeout = 300000, expected = AssertionError.class) + public void testAcknowledgementQOS0() throws Exception { + test(0); + } + + private void test(int qos) throws Exception { + String subscriberId = UUID.randomUUID().toString(); + String senderId = UUID.randomUUID().toString(); + String topic = UUID.randomUUID().toString(); + + subscriber = createMqttClient(subscriberId); + subscriber.subscribe(topic, qos); + + sender = createMqttClient(senderId); + sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, false); + sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, false); + + boolean satisfied = Wait.waitFor(() -> messageIds.size() == 2, 5_000); + if (!satisfied) { + Assert.fail(); + } + + subscriber.messageArrivedComplete(messageIds.getLast(), qos); + subscriber.disconnect(); + subscriber.close(); + messageArrived = false; + + satisfied = Wait.waitFor(() -> { + try { + subscriber = createMqttClient(subscriberId); + return true; + } catch (MqttException e) { + return false; + } + }, 60_000); + if (!satisfied) { + Assert.fail(); + } + + satisfied = Wait.waitFor(() -> messageArrived == true, 5_000); + if (!satisfied) { + Assert.fail(); + } + } + + private MqttClient createMqttClient(String clientId) throws MqttException { + MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); + client.setCallback(createCallback()); + client.setManualAcks(true); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(false); + client.connect(options); + return client; + } + + private MqttCallback createCallback() { + return new MqttCallback() { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + messageIds.add(message.getId()); + messageArrived = true; + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void connectionLost(Throwable cause) { + } + }; + } +}