apply patch after review
This commit is contained in:
Timothy Bish 2014-02-19 14:12:49 -05:00
parent 0db7e69b4e
commit 7e56f348bc
3 changed files with 82 additions and 17 deletions

View File

@ -366,24 +366,33 @@ public class MQTTProtocolConverter {
}
QoS onSubscribe(Topic topic) throws MQTTProtocolException {
if( !mqttSubscriptionByTopic.containsKey(topic.name()) ) {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true);
if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
if( mqttSubscriptionByTopic.containsKey(topic.name())) {
if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
// remove old subscription as the QoS has changed
onUnSubscribe(topic.name());
} else {
// duplicate SUBSCRIBE packet, nothing to do
return topic.qos();
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
sendToActiveMQ(consumerInfo, null);
}
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true);
if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
sendToActiveMQ(consumerInfo, null);
return topic.qos();
}

View File

@ -67,4 +67,8 @@ class MQTTSubscription {
public ConsumerInfo getConsumerInfo() {
return consumerInfo;
}
public QoS qos() {
return qos;
}
}

View File

@ -301,7 +301,7 @@ public class MQTTTest extends AbstractMQTTTest {
initializeConnection(subscriber);
String RETAINED = "retained";
publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true);
publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true);
List<String> messages = new ArrayList<String>();
for (int i = 0; i < 10; i++){
@ -320,7 +320,7 @@ public class MQTTTest extends AbstractMQTTTest {
for (int i =0; i < 10; i++){
msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(messages.get(i),new String(msg));
assertEquals(messages.get(i), new String(msg));
}
subscriber.disconnect();
publisher.disconnect();
@ -377,6 +377,58 @@ public class MQTTTest extends AbstractMQTTTest {
}
@Test(timeout = 60 * 1000)
public void testDuplicateSubscriptions() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2);
final int[] actualQoS = {-1};
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
// validate the QoS
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
publish.decode(frame);
} catch (ProtocolException e) {
fail("Failed decoding " + e.getMessage());
}
actualQoS[0] = publish.qos().ordinal();
}
}
});
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String RETAIN = "RETAIN";
connection.publish("TopicA", RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE };
for (QoS qos : qoss) {
connection.subscribe(new Topic[]{ new Topic("TopicA", qos) });
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(RETAIN, new String(msg.getPayload()));
int waitCount = 0;
while (actualQoS[0] == -1 && waitCount < 10) {
Thread.sleep(1000);
waitCount++;
}
assertEquals(qos.ordinal(), actualQoS[0]);
}
connection.unsubscribe(new String[]{"TopicA"});
connection.disconnect();
}
@Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();