ARTEMIS-1272 fix mqtt acknowledgement issue
This commit is contained in:
parent
ca9b6d4cc0
commit
34697d58e1
|
@ -119,7 +119,7 @@ public class MQTTPublishManager {
|
||||||
int qos = decideQoS(message, consumer);
|
int qos = decideQoS(message, consumer);
|
||||||
if (qos == 0) {
|
if (qos == 0) {
|
||||||
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
|
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
|
||||||
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
|
session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
|
||||||
} else if (qos == 1 || qos == 2) {
|
} else if (qos == 1 || qos == 2) {
|
||||||
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
||||||
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
||||||
|
@ -202,7 +202,7 @@ public class MQTTPublishManager {
|
||||||
if (ref != null) {
|
if (ref != null) {
|
||||||
Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
|
Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
|
||||||
session.getServerSession().send(m, true);
|
session.getServerSession().send(m, true);
|
||||||
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
session.getServerSession().individualAcknowledge(ref.getB(), ref.getA());
|
||||||
} else {
|
} else {
|
||||||
session.getProtocolHandler().sendPubRel(messageId);
|
session.getProtocolHandler().sendPubRel(messageId);
|
||||||
}
|
}
|
||||||
|
@ -214,7 +214,7 @@ public class MQTTPublishManager {
|
||||||
void handlePubComp(int messageId) throws Exception {
|
void handlePubComp(int messageId) throws Exception {
|
||||||
Pair<Long, Long> ref = session.getState().getOutboundStore().publishComplete(messageId);
|
Pair<Long, Long> ref = session.getState().getOutboundStore().publishComplete(messageId);
|
||||||
if (ref != null) {
|
if (ref != null) {
|
||||||
session.getServerSession().acknowledge(managementConsumer.getID(), ref.getA());
|
session.getServerSession().individualAcknowledge(managementConsumer.getID(), ref.getA());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,7 +249,7 @@ public class MQTTPublishManager {
|
||||||
try {
|
try {
|
||||||
Pair<Long, Long> ref = outboundStore.publishAckd(messageId);
|
Pair<Long, Long> ref = outboundStore.publishAckd(messageId);
|
||||||
if (ref != null) {
|
if (ref != null) {
|
||||||
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
session.getServerSession().individualAcknowledge(ref.getB(), ref.getA());
|
||||||
}
|
}
|
||||||
} catch (ActiveMQIllegalStateException e) {
|
} catch (ActiveMQIllegalStateException e) {
|
||||||
log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
|
log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
||||||
<karaf.version>4.0.6</karaf.version>
|
<karaf.version>4.0.6</karaf.version>
|
||||||
<pax.exam.version>4.9.1</pax.exam.version>
|
<pax.exam.version>4.9.1</pax.exam.version>
|
||||||
|
<paho.client.mqttv3.version>1.1.0</paho.client.mqttv3.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
|
@ -167,8 +168,9 @@
|
||||||
<artifactId>mqtt-client</artifactId>
|
<artifactId>mqtt-client</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.paho</groupId>
|
<groupId>org.eclipse.paho</groupId>
|
||||||
<artifactId>mqtt-client</artifactId>
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
<version>${paho.client.mqttv3.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.mqtt;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
import org.jgroups.util.UUID;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class MqttAcknowledgementTest extends MQTTTestSupport {
|
||||||
|
|
||||||
|
private volatile LinkedList<Integer> messageIds = new LinkedList<>();
|
||||||
|
private volatile boolean messageArrived = false;
|
||||||
|
|
||||||
|
private MqttClient subscriber;
|
||||||
|
private MqttClient sender;
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void clean() throws MqttException {
|
||||||
|
messageArrived = false;
|
||||||
|
messageIds.clear();
|
||||||
|
if (subscriber.isConnected()) {
|
||||||
|
subscriber.disconnect();
|
||||||
|
}
|
||||||
|
if (sender.isConnected()) {
|
||||||
|
sender.disconnect();
|
||||||
|
}
|
||||||
|
subscriber.close();
|
||||||
|
sender.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testAcknowledgementQOS1() throws Exception {
|
||||||
|
test(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000, expected = AssertionError.class)
|
||||||
|
public void testAcknowledgementQOS0() throws Exception {
|
||||||
|
test(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void test(int qos) throws Exception {
|
||||||
|
String subscriberId = UUID.randomUUID().toString();
|
||||||
|
String senderId = UUID.randomUUID().toString();
|
||||||
|
String topic = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
subscriber = createMqttClient(subscriberId);
|
||||||
|
subscriber.subscribe(topic, qos);
|
||||||
|
|
||||||
|
sender = createMqttClient(senderId);
|
||||||
|
sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, false);
|
||||||
|
sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, false);
|
||||||
|
|
||||||
|
boolean satisfied = Wait.waitFor(() -> messageIds.size() == 2, 5_000);
|
||||||
|
if (!satisfied) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.messageArrivedComplete(messageIds.getLast(), qos);
|
||||||
|
subscriber.disconnect();
|
||||||
|
subscriber.close();
|
||||||
|
messageArrived = false;
|
||||||
|
|
||||||
|
satisfied = Wait.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
subscriber = createMqttClient(subscriberId);
|
||||||
|
return true;
|
||||||
|
} catch (MqttException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 60_000);
|
||||||
|
if (!satisfied) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
satisfied = Wait.waitFor(() -> messageArrived == true, 5_000);
|
||||||
|
if (!satisfied) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MqttClient createMqttClient(String clientId) throws MqttException {
|
||||||
|
MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
|
||||||
|
client.setCallback(createCallback());
|
||||||
|
client.setManualAcks(true);
|
||||||
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
|
options.setCleanSession(false);
|
||||||
|
client.connect(options);
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MqttCallback createCallback() {
|
||||||
|
return new MqttCallback() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||||
|
messageIds.add(message.getId());
|
||||||
|
messageArrived = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable cause) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue