ARTEMIS-1061 Ack MQTT PubRel management messages

This commit is contained in:
Martyn Taylor 2017-03-22 21:27:56 +00:00 committed by Clebert Suconic
parent 1f5921b8a2
commit e33b7af5ac
4 changed files with 36 additions and 5 deletions

View File

@ -42,8 +42,6 @@ public class MQTTPublishManager {
private static final Logger logger = Logger.getLogger(MQTTPublishManager.class);
private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
private SimpleString managementAddress;
private ServerConsumer managementConsumer;
@ -90,7 +88,7 @@ public class MQTTPublishManager {
}
private SimpleString createManagementAddress() {
return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
return new SimpleString(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
}
private void createManagementQueue() throws Exception {
@ -183,6 +181,7 @@ public class MQTTPublishManager {
void sendPubRelMessage(Message message) {
int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
session.getSessionState().getOutboundStore().publishReleasedSent(messageId, message.getMessageID());
session.getProtocolHandler().sendPubRel(messageId);
}
@ -213,7 +212,7 @@ public class MQTTPublishManager {
void handlePubComp(int messageId) throws Exception {
Pair<Long, Long> ref = session.getState().getOutboundStore().publishComplete(messageId);
if (ref != null) {
session.getServerSession().acknowledge(ref.getB(), ref.getA());
session.getServerSession().acknowledge(managementConsumer.getID(), ref.getA());
}
}

View File

@ -168,6 +168,12 @@ public class MQTTSessionState {
return publishAckd(mqtt);
}
public void publishReleasedSent(int mqttId, long serverMessageId) {
synchronized (dataStoreLock) {
mqttToServerIds.put(mqttId, new Pair<>(serverMessageId, 0L));
}
}
public Pair<Long, Long> publishComplete(int mqtt) {
return publishAckd(mqtt);
}

View File

@ -63,6 +63,8 @@ public class MQTTUtil {
public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
public static String convertMQTTAddressFilterToCore(String filter, WildcardConfiguration wildcardConfiguration) {

View File

@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
@ -188,6 +190,29 @@ public class MQTTTest extends MQTTTestSupport {
provider.disconnect();
}
@Test(timeout = 2 * 60 * 1000)
public void testManagementQueueMessagesAreAckd() throws Exception {
String clientId = "test.client.id";
final MQTTClientProvider provider = getMQTTClientProvider();
provider.setClientId(clientId);
initializeConnection(provider);
provider.subscribe("foo", EXACTLY_ONCE);
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), EXACTLY_ONCE);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
final Queue queue = server.locateQueue(new SimpleString(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + clientId));
Wait.waitFor(() -> queue.getMessageCount() == 0, 1000, 100);
assertEquals(0, queue.getMessageCount());
provider.disconnect();
}
@Test(timeout = 2 * 60 * 1000)
public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
@ -1065,7 +1090,6 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals("test message", new String(m.getPayload()));
}
@Test(timeout = 60 * 1000)
public void testCleanSession() throws Exception {
final String CLIENTID = "cleansession";