Some more refactorings to allow the strategies to have more control of
the subsciption model in use.
This commit is contained in:
Timothy Bish 2014-08-11 14:03:35 -04:00
parent 8b64e139f8
commit ec049a0882
5 changed files with 193 additions and 144 deletions

View File

@ -41,15 +41,12 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
@ -93,17 +90,13 @@ public class MQTTProtocolConverter {
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5; private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
static final int DEFAULT_CACHE_SIZE = 5000; static final int DEFAULT_CACHE_SIZE = 5000;
private static final byte SUBSCRIBE_ERROR = (byte) 0x80;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1); private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1); private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
@ -120,7 +113,7 @@ public class MQTTProtocolConverter {
private CONNECT connect; private CONNECT connect;
private String clientId; private String clientId;
private long defaultKeepAlive; private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch=1; private int activeMQSubscriptionPrefetch = 1;
protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
private final MQTTPacketIdGenerator packetIdGenerator; private final MQTTPacketIdGenerator packetIdGenerator;
private boolean publishDollarTopics; private boolean publishDollarTopics;
@ -351,7 +344,11 @@ public class MQTTProtocolConverter {
if (topics != null) { if (topics != null) {
byte[] qos = new byte[topics.length]; byte[] qos = new byte[topics.length];
for (int i = 0; i < topics.length; i++) { for (int i = 0; i < topics.length; i++) {
qos[i] = onSubscribe(topics[i]); try {
qos[i] = getSubscriptionStrategy().onSubscribe(topics[i]);
} catch (IOException e) {
throw new MQTTProtocolException("Failed to process subscription request", true, e);
}
} }
SUBACK ack = new SUBACK(); SUBACK ack = new SUBACK();
ack.messageId(command.messageId()); ack.messageId(command.messageId());
@ -366,71 +363,16 @@ public class MQTTProtocolConverter {
} }
} }
public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
final String destinationName = topic.name().toString();
final QoS requestedQoS = topic.qos();
if (mqttSubscriptionByTopic.containsKey(destinationName)) {
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
if (requestedQoS != mqttSubscription.getQoS()) {
// remove old subscription as the QoS has changed
onUnSubscribe(destinationName);
} else {
try {
getSubscriptionStrategy().onReSubscribe(mqttSubscription);
} catch (IOException e) {
throw new MQTTProtocolException("Failed to find subscription strategy", true, e);
}
return (byte) requestedQoS.ordinal();
}
}
try {
return getSubscriptionStrategy().onSubscribe(destinationName, requestedQoS);
} catch (IOException e) {
throw new MQTTProtocolException("Failed while intercepting subscribe", true, e);
}
}
public byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException {
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, qoS, consumerInfo);
// optimistic add to local maps first to be able to handle commands in onActiveMQCommand
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
final byte[] qos = {-1};
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.warn("Error subscribing to {}", topicName, throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
qos[0] = (byte) qoS.ordinal();
}
}
});
if (qos[0] == SUBSCRIBE_ERROR) {
// remove from local maps if subscribe failed
subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
mqttSubscriptionByTopic.remove(topicName);
}
return qos[0];
}
public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
checkConnected(); checkConnected();
UTF8Buffer[] topics = command.topics(); UTF8Buffer[] topics = command.topics();
if (topics != null) { if (topics != null) {
for (UTF8Buffer topic : topics) { for (UTF8Buffer topic : topics) {
onUnSubscribe(topic.toString()); try {
getSubscriptionStrategy().onUnSubscribe(topic.toString());
} catch (IOException e) {
throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
}
} }
} }
UNSUBACK ack = new UNSUBACK(); UNSUBACK ack = new UNSUBACK();
@ -438,38 +380,6 @@ public class MQTTProtocolConverter {
sendToMQTT(ack.encode()); sendToMQTT(ack.encode());
} }
public void onUnSubscribe(String topicName) {
MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
if (subscription != null) {
doUnSubscribe(subscription);
// check if the broker side of the subscription needs to be removed
try {
getSubscriptionStrategy().onUnSubscribe(subscription);
} catch (IOException e) {
// Ignore
}
}
}
public void doUnSubscribe(MQTTSubscription subscription) {
mqttSubscriptionByTopic.remove(subscription.getTopicName());
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId());
}
RemoveInfo removeInfo = null;
if (info != null) {
removeInfo = info.createRemoveCommand();
}
sendToActiveMQ(removeInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
/** /**
* Dispatch an ActiveMQ command * Dispatch an ActiveMQ command
*/ */
@ -488,7 +398,7 @@ public class MQTTProtocolConverter {
} }
} else if (command.isMessageDispatch()) { } else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command; MessageDispatch md = (MessageDispatch) command;
MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); MQTTSubscription sub = getSubscriptionStrategy().getSubscription(md.getConsumerId());
if (sub != null) { if (sub != null) {
MessageAck ack = sub.createMessageAck(md); MessageAck ack = sub.createMessageAck(md);
PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
@ -848,8 +758,8 @@ public class MQTTProtocolConverter {
return connectionId; return connectionId;
} }
public ConsumerId getNextConsumerId() { public SessionId getSessionId() {
return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); return sessionId;
} }
public boolean isCleanSession() { public boolean isCleanSession() {

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.activemq.transport.mqtt.strategy; package org.apache.activemq.transport.mqtt.strategy;
import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
@ -30,9 +32,18 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException; import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTSubscription; import org.apache.activemq.transport.mqtt.MQTTSubscription;
import org.apache.activemq.transport.mqtt.ResponseHandler;
import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing * Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing
@ -40,9 +51,18 @@ import org.apache.activemq.transport.mqtt.MQTTSubscription;
*/ */
public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy, BrokerServiceAware { public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMQTTSubscriptionStrategy.class);
private static final byte SUBSCRIBE_ERROR = (byte) 0x80;
protected MQTTProtocolConverter protocol; protected MQTTProtocolConverter protocol;
protected BrokerService brokerService; protected BrokerService brokerService;
protected final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
protected final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@Override @Override
public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException { public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException {
setProtocolConverter(protocol); setProtocolConverter(protocol);
@ -63,6 +83,34 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
return protocol; return protocol;
} }
@Override
public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
final String destinationName = topic.name().toString();
final QoS requestedQoS = topic.qos();
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
if (mqttSubscription != null) {
if (requestedQoS != mqttSubscription.getQoS()) {
// remove old subscription as the QoS has changed
onUnSubscribe(destinationName);
} else {
try {
onReSubscribe(mqttSubscription);
} catch (IOException e) {
throw new MQTTProtocolException("Failed to find subscription strategy", true, e);
}
return (byte) requestedQoS.ordinal();
}
}
try {
return onSubscribe(destinationName, requestedQoS);
} catch (IOException e) {
throw new MQTTProtocolException("Failed while intercepting subscribe", true, e);
}
}
@Override @Override
public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
String topicName = mqttSubscription.getTopicName(); String topicName = mqttSubscription.getTopicName();
@ -126,4 +174,61 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
public boolean isControlTopic(ActiveMQDestination destination) { public boolean isControlTopic(ActiveMQDestination destination) {
return destination.getPhysicalName().startsWith("$"); return destination.getPhysicalName().startsWith("$");
} }
@Override
public MQTTSubscription getSubscription(ConsumerId consumerId) {
return subscriptionsByConsumerId.get(consumerId);
}
protected ConsumerId getNextConsumerId() {
return new ConsumerId(protocol.getSessionId(), consumerIdGenerator.getNextSequenceId());
}
protected byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException {
MQTTSubscription mqttSubscription = new MQTTSubscription(protocol, topicName, qoS, consumerInfo);
// optimistic add to local maps first to be able to handle commands in onActiveMQCommand
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
final byte[] qos = {-1};
protocol.sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.warn("Error subscribing to {}", topicName, throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
qos[0] = (byte) qoS.ordinal();
}
}
});
if (qos[0] == SUBSCRIBE_ERROR) {
// remove from local maps if subscribe failed
subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
mqttSubscriptionByTopic.remove(topicName);
}
return qos[0];
}
public void doUnSubscribe(MQTTSubscription subscription) {
mqttSubscriptionByTopic.remove(subscription.getTopicName());
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId());
RemoveInfo removeInfo = info.createRemoveCommand();
protocol.sendToActiveMQ(removeInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
}
} }

View File

@ -68,7 +68,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName)); ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId()); ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
consumerInfo.setDestination(destination); consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true); consumerInfo.setRetroactive(true);
@ -78,7 +78,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
} }
return protocol.doSubscribe(consumerInfo, topicName, requestedQoS); return doSubscribe(consumerInfo, topicName, requestedQoS);
} }
@Override @Override
@ -96,7 +96,11 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
} }
@Override @Override
public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException { public void onUnSubscribe(String topicName) throws MQTTProtocolException {
MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
if (subscription != null) {
doUnSubscribe(subscription);
// check if the durable sub also needs to be removed // check if the durable sub also needs to be removed
if (subscription.getConsumerInfo().getSubscriptionName() != null) { if (subscription.getConsumerInfo().getSubscriptionName() != null) {
// also remove it from restored durable subscriptions set // also remove it from restored durable subscriptions set
@ -114,6 +118,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
}); });
} }
} }
}
private void deleteDurableSubs(List<SubscriptionInfo> subs) { private void deleteDurableSubs(List<SubscriptionInfo> subs) {
try { try {
@ -140,7 +145,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
String name = sub.getSubcriptionName(); String name = sub.getSubcriptionName();
String[] split = name.split(":", 2); String[] split = name.split(":", 2);
QoS qoS = QoS.valueOf(split[0]); QoS qoS = QoS.valueOf(split[0]);
protocol.onSubscribe(new Topic(split[1], qoS)); onSubscribe(new Topic(split[1], qoS));
// mark this durable subscription as restored by Broker // mark this durable subscription as restored by Broker
restoredSubs.add(split[1]); restoredSubs.add(split[1]);
} }

View File

@ -17,10 +17,12 @@
package org.apache.activemq.transport.mqtt.strategy; package org.apache.activemq.transport.mqtt.strategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException; import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTSubscription; import org.apache.activemq.transport.mqtt.MQTTSubscription;
import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT; import org.fusesource.mqtt.codec.CONNECT;
/** /**
@ -49,6 +51,19 @@ public interface MQTTSubscriptionStrategy {
*/ */
public void onConnect(CONNECT connect) throws MQTTProtocolException; public void onConnect(CONNECT connect) throws MQTTProtocolException;
/**
* Called for each Topic that a client requests to subscribe to. The strategy needs
* check each Topic for duplicate subscription requests and change of QoS state.
*
* @param topic
* the MQTT Topic instance being subscribed to.
*
* @return the assigned QoS value given to the new subscription.
*
* @throws MQTTProtocolException if an error occurs while processing the subscribe actions.
*/
public byte onSubscribe(Topic topic) throws MQTTProtocolException;
/** /**
* Called when a new Subscription is being requested. This method allows the * Called when a new Subscription is being requested. This method allows the
* strategy to create a specific type of subscription for the client such as * strategy to create a specific type of subscription for the client such as
@ -80,12 +95,12 @@ public interface MQTTSubscriptionStrategy {
/** /**
* Called when a client requests an un-subscribe a previous subscription. * Called when a client requests an un-subscribe a previous subscription.
* *
* @param subscription * @param topicName
* the {@link MQTTSubscription} that is being removed. * the name of the Topic the client wishes to unsubscribe from.
* *
* @throws MQTTProtocolException if an error occurs during the un-subscribe processing. * @throws MQTTProtocolException if an error occurs during the un-subscribe processing.
*/ */
public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException; public void onUnSubscribe(String topicName) throws MQTTProtocolException;
/** /**
* Intercepts PUBLISH operations from the client and allows the strategy to map the * Intercepts PUBLISH operations from the client and allows the strategy to map the
@ -136,4 +151,14 @@ public interface MQTTSubscriptionStrategy {
*/ */
public MQTTProtocolConverter getProtocolConverter(); public MQTTProtocolConverter getProtocolConverter();
/**
* Lookup an {@link MQTTSubscription} instance based on known {@link ConsumerId} value.
*
* @param consumer
* the consumer ID to lookup.
*
* @return the {@link MQTTSubscription} for the consumer or null if no subscription exists.
*/
public MQTTSubscription getSubscription(ConsumerId consumer);
} }

View File

@ -97,13 +97,13 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
destination = new ActiveMQTopic(converted); destination = new ActiveMQTopic(converted);
} }
ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId()); ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
consumerInfo.setDestination(destination); consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true); consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
return protocol.doSubscribe(consumerInfo, topicName, requestedQoS); return doSubscribe(consumerInfo, topicName, requestedQoS);
} }
@Override @Override
@ -120,15 +120,18 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
if (mqttSubscription.getDestination().isTopic()) { if (mqttSubscription.getDestination().isTopic()) {
super.onReSubscribe(mqttSubscription); super.onReSubscribe(mqttSubscription);
} else { } else {
protocol.doUnSubscribe(mqttSubscription); doUnSubscribe(mqttSubscription);
ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
consumerInfo.setConsumerId(protocol.getNextConsumerId()); consumerInfo.setConsumerId(getNextConsumerId());
protocol.doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS()); doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
} }
} }
@Override @Override
public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException { public void onUnSubscribe(String topicName) throws MQTTProtocolException {
MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
if (subscription != null) {
doUnSubscribe(subscription);
if (subscription.getDestination().isQueue()) { if (subscription.getDestination().isQueue()) {
DestinationInfo remove = new DestinationInfo(); DestinationInfo remove = new DestinationInfo();
remove.setConnectionId(protocol.getConnectionId()); remove.setConnectionId(protocol.getConnectionId());
@ -143,6 +146,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
}); });
} }
} }
}
@Override @Override
public ActiveMQDestination onSend(String topicName) { public ActiveMQDestination onSend(String topicName) {
@ -203,13 +207,13 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
QoS qoS = QoS.valueOf(qosString); QoS qoS = QoS.valueOf(qosString);
LOG.trace("Restoring subscription: {}:{}", topicName, qoS); LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId()); ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
consumerInfo.setDestination(queue); consumerInfo.setDestination(queue);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true); consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
protocol.doSubscribe(consumerInfo, topicName, qoS); doSubscribe(consumerInfo, topicName, qoS);
// mark this durable subscription as restored by Broker // mark this durable subscription as restored by Broker
restoredQueues.add(queue); restoredQueues.add(queue);