Fixed AMQ-5160, fixed race condition for retained messages

This commit is contained in:
Dhiraj Bokde 2014-05-13 12:16:44 -07:00 committed by Dejan Bosanac
parent c915b19a20
commit 8644090377
1 changed files with 11 additions and 5 deletions

View File

@ -386,6 +386,10 @@ public class MQTTProtocolConverter {
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
// optimistic add to local maps first to be able to handle commands in onActiveMQCommand
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
final byte[] qos = {-1};
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
@ -401,9 +405,10 @@ public class MQTTProtocolConverter {
}
});
if (qos[0] != SUBSCRIBE_ERROR) {
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
if (qos[0] == SUBSCRIBE_ERROR) {
// remove from local maps if subscribe failed
subscriptionsByConsumerId.remove(id);
mqttSubscriptionByTopic.remove(topicName);
}
return qos[0];
@ -431,7 +436,7 @@ public class MQTTProtocolConverter {
final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
// recover retroactive messages for matching subscriptions
// recover retroactive messages for matching subscription
for (Subscription subscription : dest.getConsumers()) {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try {
@ -440,6 +445,7 @@ public class MQTTProtocolConverter {
throw new MQTTProtocolException("Error recovering retained messages for " +
dest.getName() + ": " + e.getMessage(), false, e);
}
break;
}
}
}
@ -483,7 +489,7 @@ public class MQTTProtocolConverter {
}
/**
* Dispatch a ActiveMQ command
* Dispatch an ActiveMQ command
*/
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {