mirror of https://github.com/apache/activemq.git
Fix for subscription recovery of durable topic subscriptions using default subscription strategy and subscribing to a VirtualTopic instance.
This commit is contained in:
parent
928e815a02
commit
47e954d0f6
|
@ -152,7 +152,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
|
|||
QoS qoS = QoS.valueOf(split[0]);
|
||||
onSubscribe(new Topic(split[1], qoS));
|
||||
// mark this durable subscription as restored by Broker
|
||||
restoredSubs.add(split[1]);
|
||||
restoredSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not restore the MQTT durable subs.", e);
|
||||
|
|
|
@ -64,7 +64,6 @@ import org.fusesource.mqtt.client.Topic;
|
|||
import org.fusesource.mqtt.client.Tracer;
|
||||
import org.fusesource.mqtt.codec.MQTTFrame;
|
||||
import org.fusesource.mqtt.codec.PUBLISH;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -510,87 +509,28 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
|
||||
@Test(timeout = 120 * 1000)
|
||||
public void testRetainedMessage() throws Exception {
|
||||
MQTT mqtt = createMQTTConnection();
|
||||
mqtt.setKeepAlive((short) 60);
|
||||
|
||||
final String RETAIN = "RETAIN";
|
||||
final String TOPICA = "TopicA";
|
||||
|
||||
final String[] clientIds = { null, "foo", "durable" };
|
||||
for (String clientId : clientIds) {
|
||||
LOG.info("Testing now with Client ID: {}", clientId);
|
||||
|
||||
mqtt.setClientId(clientId);
|
||||
mqtt.setCleanSession(!"durable".equals(clientId));
|
||||
|
||||
BlockingConnection connection = mqtt.blockingConnection();
|
||||
connection.connect();
|
||||
|
||||
// set retained message and check
|
||||
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
assertNotNull("No retained message for " + clientId, msg);
|
||||
assertEquals(RETAIN, new String(msg.getPayload()));
|
||||
msg.ack();
|
||||
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
|
||||
|
||||
// test duplicate subscription
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
msg = connection.receive(15000, TimeUnit.MILLISECONDS);
|
||||
assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
|
||||
assertEquals(RETAIN, new String(msg.getPayload()));
|
||||
msg.ack();
|
||||
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
|
||||
connection.unsubscribe(new String[]{TOPICA});
|
||||
|
||||
// clear retained message and check that we don't receive it
|
||||
connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
msg = connection.receive(500, TimeUnit.MILLISECONDS);
|
||||
assertNull("Retained message not cleared for " + clientId, msg);
|
||||
connection.unsubscribe(new String[]{TOPICA});
|
||||
|
||||
// set retained message again and check
|
||||
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
assertNotNull("No reset retained message for " + clientId, msg);
|
||||
assertEquals(RETAIN, new String(msg.getPayload()));
|
||||
msg.ack();
|
||||
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
|
||||
|
||||
// re-connect and check
|
||||
connection.disconnect();
|
||||
connection = mqtt.blockingConnection();
|
||||
connection.connect();
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
assertNotNull("No reset retained message for " + clientId, msg);
|
||||
assertEquals(RETAIN, new String(msg.getPayload()));
|
||||
msg.ack();
|
||||
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
|
||||
|
||||
connection.unsubscribe(new String[]{TOPICA});
|
||||
connection.disconnect();
|
||||
}
|
||||
doTestRetainedMessages("TopicA");
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Test(timeout = 120 * 1000)
|
||||
public void testRetainedMessageOnVirtualTopics() throws Exception {
|
||||
doTestRetainedMessages("VirtualTopic/TopicA");
|
||||
}
|
||||
|
||||
public void doTestRetainedMessages(String topicName) throws Exception {
|
||||
MQTT mqtt = createMQTTConnection();
|
||||
mqtt.setKeepAlive((short) 60);
|
||||
|
||||
final String RETAIN = "RETAIN";
|
||||
final String TOPICA = "VirtualTopic/TopicA";
|
||||
final String TOPICA = topicName;
|
||||
|
||||
final String[] clientIds = { null, "foo", "durable" };
|
||||
for (String clientId : clientIds) {
|
||||
LOG.info("Testing now with Client ID: {}", clientId);
|
||||
boolean cleanSession = !"durable".equals(clientId);
|
||||
LOG.info("Testing now with Client ID: {} clean: {}", clientId, cleanSession);
|
||||
|
||||
mqtt.setClientId(clientId);
|
||||
mqtt.setCleanSession(!"durable".equals(clientId));
|
||||
mqtt.setCleanSession(cleanSession);
|
||||
|
||||
BlockingConnection connection = mqtt.blockingConnection();
|
||||
connection.connect();
|
||||
|
@ -622,6 +562,7 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
|
||||
// set retained message again and check
|
||||
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||
LOG.info("Performing first subscription");
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
assertNotNull("No reset retained message for " + clientId, msg);
|
||||
|
@ -633,6 +574,7 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
connection.disconnect();
|
||||
connection = mqtt.blockingConnection();
|
||||
connection.connect();
|
||||
LOG.info("Performing second subscription");
|
||||
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
assertNotNull("No reset retained message for " + clientId, msg);
|
||||
|
|
Loading…
Reference in New Issue