From 7948d69056be9021e07fbdb1dd158f75f52b7bf9 Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Fri, 9 Jan 2015 12:50:39 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5511 - retained message for zero-byte clientId client --- .../AbstractMQTTSubscriptionStrategy.java | 6 ++++- .../activemq/transport/mqtt/MQTTTest.java | 23 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java index d77c51b083..121b8296f5 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java @@ -129,7 +129,11 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti // use actual client id used to create connection to lookup connection // context - final String connectionInfoClientId = protocol.getClientId(); + String connectionInfoClientId = protocol.getClientId(); + // for zero-byte client ids we used connection id + if (connectionInfoClientId == null || connectionInfoClientId.isEmpty()) { + connectionInfoClientId = protocol.getConnectionId().toString(); + } final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId); // get all matching Topics diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index c9f106d137..3bb8758032 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -966,6 +966,29 @@ public class MQTTTest extends MQTTTestSupport { newConnection.disconnect(); } + @Test(timeout = 60 * 1000) + public void testNoClientId() throws Exception { + final MQTT mqtt = createMQTTConnection("", true); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connection.isConnected(); + } + }); + + connection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)}); + connection.publish("TopicA", "test".getBytes(), QoS.AT_LEAST_ONCE, true); + Message message = connection.receive(3, TimeUnit.SECONDS); + assertNotNull(message); + Thread.sleep(2000); + connection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)}); + //TODO fix audit problem for retained messages + //message = connection.receive(3, TimeUnit.SECONDS); + //assertNotNull(message); + } + @Test(timeout = 60 * 1000) public void testCleanSession() throws Exception { final String CLIENTID = "cleansession";