mirror of https://github.com/apache/activemq.git
Resolves issues with the virtual topic subscription strategy especially when subscribing durably to the Topic portion of a virtual destination.
This commit is contained in:
parent
3649f13b81
commit
af999fe2b2
|
@ -17,6 +17,10 @@
|
||||||
package org.apache.activemq.transport.mqtt.strategy;
|
package org.apache.activemq.transport.mqtt.strategy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
@ -24,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.BrokerServiceAware;
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||||
import org.apache.activemq.broker.region.PrefetchSubscription;
|
import org.apache.activemq.broker.region.PrefetchSubscription;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
|
@ -35,9 +40,12 @@ import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.ExceptionResponse;
|
import org.apache.activemq.command.ExceptionResponse;
|
||||||
import org.apache.activemq.command.RemoveInfo;
|
import org.apache.activemq.command.RemoveInfo;
|
||||||
|
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
|
import org.apache.activemq.command.SubscriptionInfo;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
|
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
|
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
|
||||||
|
import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTSubscription;
|
import org.apache.activemq.transport.mqtt.MQTTSubscription;
|
||||||
import org.apache.activemq.transport.mqtt.ResponseHandler;
|
import org.apache.activemq.transport.mqtt.ResponseHandler;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
|
@ -61,6 +69,7 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
|
||||||
|
|
||||||
protected final ConcurrentMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
|
protected final ConcurrentMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
|
||||||
protected final ConcurrentMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
|
protected final ConcurrentMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
|
||||||
|
protected final Set<String> restoredDurableSubs = Collections.synchronizedSet(new HashSet<String>());
|
||||||
|
|
||||||
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
|
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
|
||||||
|
|
||||||
|
@ -242,4 +251,69 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//----- Durable Subscription management methods --------------------------//
|
||||||
|
|
||||||
|
protected void deleteDurableSubs(List<SubscriptionInfo> subs) {
|
||||||
|
try {
|
||||||
|
for (SubscriptionInfo sub : subs) {
|
||||||
|
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
||||||
|
rsi.setConnectionId(protocol.getConnectionId());
|
||||||
|
rsi.setSubscriptionName(sub.getSubcriptionName());
|
||||||
|
rsi.setClientId(sub.getClientId());
|
||||||
|
protocol.sendToActiveMQ(rsi, new ResponseHandler() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
|
||||||
|
// ignore failures..
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Could not delete the MQTT durable subs.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void restoreDurableSubs(List<SubscriptionInfo> subs) {
|
||||||
|
try {
|
||||||
|
for (SubscriptionInfo sub : subs) {
|
||||||
|
String name = sub.getSubcriptionName();
|
||||||
|
String[] split = name.split(":", 2);
|
||||||
|
QoS qoS = QoS.valueOf(split[0]);
|
||||||
|
onSubscribe(new Topic(split[1], qoS));
|
||||||
|
// mark this durable subscription as restored by Broker
|
||||||
|
restoredDurableSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not restore the MQTT durable subs.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException {
|
||||||
|
List<SubscriptionInfo> result = new ArrayList<SubscriptionInfo>();
|
||||||
|
RegionBroker regionBroker;
|
||||||
|
|
||||||
|
try {
|
||||||
|
regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new MQTTProtocolException("Error recovering durable subscriptions: " + e.getMessage(), false, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
|
||||||
|
List<DurableTopicSubscription> subscriptions = topicRegion.lookupSubscriptions(clientId);
|
||||||
|
if (subscriptions != null) {
|
||||||
|
for (DurableTopicSubscription subscription : subscriptions) {
|
||||||
|
LOG.debug("Recovered durable sub:{} on connect", subscription);
|
||||||
|
|
||||||
|
SubscriptionInfo info = new SubscriptionInfo();
|
||||||
|
|
||||||
|
info.setDestination(subscription.getActiveMQDestination());
|
||||||
|
info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName());
|
||||||
|
info.setClientId(clientId);
|
||||||
|
|
||||||
|
result.add(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,16 +17,9 @@
|
||||||
package org.apache.activemq.transport.mqtt.strategy;
|
package org.apache.activemq.transport.mqtt.strategy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
|
||||||
import org.apache.activemq.broker.region.TopicRegion;
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
|
@ -39,20 +32,13 @@ import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTSubscription;
|
import org.apache.activemq.transport.mqtt.MQTTSubscription;
|
||||||
import org.apache.activemq.transport.mqtt.ResponseHandler;
|
import org.apache.activemq.transport.mqtt.ResponseHandler;
|
||||||
import org.fusesource.mqtt.client.QoS;
|
import org.fusesource.mqtt.client.QoS;
|
||||||
import org.fusesource.mqtt.client.Topic;
|
|
||||||
import org.fusesource.mqtt.codec.CONNECT;
|
import org.fusesource.mqtt.codec.CONNECT;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default implementation that uses unmapped topic subscriptions.
|
* Default implementation that uses unmapped topic subscriptions.
|
||||||
*/
|
*/
|
||||||
public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
|
public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class);
|
|
||||||
|
|
||||||
private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConnect(CONNECT connect) throws MQTTProtocolException {
|
public void onConnect(CONNECT connect) throws MQTTProtocolException {
|
||||||
List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
|
List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
|
||||||
|
@ -93,7 +79,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
|
||||||
|
|
||||||
// check whether the Topic has been recovered in restoreDurableSubs
|
// check whether the Topic has been recovered in restoreDurableSubs
|
||||||
// mark subscription available for recovery for duplicate subscription
|
// mark subscription available for recovery for duplicate subscription
|
||||||
if (restoredSubs.remove(destination.getPhysicalName())) {
|
if (restoredDurableSubs.remove(destination.getPhysicalName())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +95,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
|
||||||
// check if the durable sub also needs to be removed
|
// check if the durable sub also needs to be removed
|
||||||
if (subscription.getConsumerInfo().getSubscriptionName() != null) {
|
if (subscription.getConsumerInfo().getSubscriptionName() != null) {
|
||||||
// also remove it from restored durable subscriptions set
|
// also remove it from restored durable subscriptions set
|
||||||
restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
|
restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
|
||||||
|
|
||||||
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
||||||
rsi.setConnectionId(protocol.getConnectionId());
|
rsi.setConnectionId(protocol.getConnectionId());
|
||||||
|
@ -124,67 +110,4 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteDurableSubs(List<SubscriptionInfo> subs) {
|
|
||||||
try {
|
|
||||||
for (SubscriptionInfo sub : subs) {
|
|
||||||
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
|
||||||
rsi.setConnectionId(protocol.getConnectionId());
|
|
||||||
rsi.setSubscriptionName(sub.getSubcriptionName());
|
|
||||||
rsi.setClientId(sub.getClientId());
|
|
||||||
protocol.sendToActiveMQ(rsi, new ResponseHandler() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
|
|
||||||
// ignore failures..
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.warn("Could not delete the MQTT durable subs.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void restoreDurableSubs(List<SubscriptionInfo> subs) {
|
|
||||||
try {
|
|
||||||
for (SubscriptionInfo sub : subs) {
|
|
||||||
String name = sub.getSubcriptionName();
|
|
||||||
String[] split = name.split(":", 2);
|
|
||||||
QoS qoS = QoS.valueOf(split[0]);
|
|
||||||
onSubscribe(new Topic(split[1], qoS));
|
|
||||||
// mark this durable subscription as restored by Broker
|
|
||||||
restoredSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Could not restore the MQTT durable subs.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException {
|
|
||||||
List<SubscriptionInfo> result = new ArrayList<SubscriptionInfo>();
|
|
||||||
RegionBroker regionBroker;
|
|
||||||
|
|
||||||
try {
|
|
||||||
regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new MQTTProtocolException("Error recovering durable subscriptions: " + e.getMessage(), false, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
|
|
||||||
List<DurableTopicSubscription> subscriptions = topicRegion.lookupSubscriptions(clientId);
|
|
||||||
if (subscriptions != null) {
|
|
||||||
for (DurableTopicSubscription subscription : subscriptions) {
|
|
||||||
LOG.debug("Recovered durable sub:{} on connect", subscription);
|
|
||||||
|
|
||||||
SubscriptionInfo info = new SubscriptionInfo();
|
|
||||||
|
|
||||||
info.setDestination(subscription.getActiveMQDestination());
|
|
||||||
info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName());
|
|
||||||
info.setClientId(clientId);
|
|
||||||
|
|
||||||
result.add(info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,9 +35,12 @@ import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.DestinationInfo;
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
|
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
|
import org.apache.activemq.command.SubscriptionInfo;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
|
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
|
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
|
||||||
|
import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
|
||||||
import org.apache.activemq.transport.mqtt.MQTTSubscription;
|
import org.apache.activemq.transport.mqtt.MQTTSubscription;
|
||||||
import org.apache.activemq.transport.mqtt.ResponseHandler;
|
import org.apache.activemq.transport.mqtt.ResponseHandler;
|
||||||
import org.fusesource.mqtt.client.QoS;
|
import org.fusesource.mqtt.client.QoS;
|
||||||
|
@ -62,35 +65,53 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
@Override
|
@Override
|
||||||
public void onConnect(CONNECT connect) throws MQTTProtocolException {
|
public void onConnect(CONNECT connect) throws MQTTProtocolException {
|
||||||
List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId());
|
List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId());
|
||||||
|
List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
|
||||||
|
|
||||||
|
// When clean session is true we must purge all of the client's old Queue subscriptions
|
||||||
|
// and any durable subscriptions created on the VirtualTopic instance as well.
|
||||||
|
|
||||||
if (connect.cleanSession()) {
|
if (connect.cleanSession()) {
|
||||||
deleteDurableQueues(queues);
|
deleteDurableQueues(queues);
|
||||||
|
deleteDurableSubs(subs);
|
||||||
} else {
|
} else {
|
||||||
restoreDurableQueue(queues);
|
restoreDurableQueue(queues);
|
||||||
|
restoreDurableSubs(subs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
|
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
|
||||||
ActiveMQDestination destination = null;
|
ActiveMQDestination destination = null;
|
||||||
|
int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
|
||||||
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
|
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
|
||||||
|
|
||||||
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
|
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
|
||||||
String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." +
|
String converted = convertMQTTToActiveMQ(topicName);
|
||||||
VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
|
if (converted.startsWith(VIRTUALTOPIC_PREFIX)) {
|
||||||
|
destination = new ActiveMQTopic(converted);
|
||||||
|
prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
|
||||||
|
consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
|
||||||
|
} else {
|
||||||
|
converted = VIRTUALTOPIC_CONSUMER_PREFIX +
|
||||||
|
protocol.getClientId() + ":" + requestedQoS + "." +
|
||||||
|
VIRTUALTOPIC_PREFIX + converted;
|
||||||
destination = new ActiveMQQueue(converted);
|
destination = new ActiveMQQueue(converted);
|
||||||
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
|
prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
String converted = convertMQTTToActiveMQ(topicName);
|
String converted = convertMQTTToActiveMQ(topicName);
|
||||||
if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
|
if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
|
||||||
converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
|
converted = VIRTUALTOPIC_PREFIX + converted;
|
||||||
}
|
}
|
||||||
destination = new ActiveMQTopic(converted);
|
destination = new ActiveMQTopic(converted);
|
||||||
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
|
prefetch = ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
consumerInfo.setDestination(destination);
|
consumerInfo.setDestination(destination);
|
||||||
if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
|
if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
|
||||||
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
|
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
|
||||||
|
} else {
|
||||||
|
consumerInfo.setPrefetchSize(prefetch);
|
||||||
}
|
}
|
||||||
consumerInfo.setRetroactive(true);
|
consumerInfo.setRetroactive(true);
|
||||||
consumerInfo.setDispatchAsync(true);
|
consumerInfo.setDispatchAsync(true);
|
||||||
|
@ -103,9 +124,15 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
|
|
||||||
ActiveMQDestination destination = mqttSubscription.getDestination();
|
ActiveMQDestination destination = mqttSubscription.getDestination();
|
||||||
|
|
||||||
|
// check whether the Queue has been recovered in restoreDurableQueue
|
||||||
|
// mark subscription available for recovery for duplicate subscription
|
||||||
|
if (destination.isQueue() && restoredQueues.remove(destination)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// check whether the Topic has been recovered in restoreDurableSubs
|
// check whether the Topic has been recovered in restoreDurableSubs
|
||||||
// mark subscription available for recovery for duplicate subscription
|
// mark subscription available for recovery for duplicate subscription
|
||||||
if (restoredQueues.remove(destination)) {
|
if (destination.isTopic() && restoredDurableSubs.remove(destination.getPhysicalName())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,6 +163,20 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
// ignore failures..
|
// ignore failures..
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} else if (subscription.getConsumerInfo().getSubscriptionName() != null) {
|
||||||
|
// also remove it from restored durable subscriptions set
|
||||||
|
restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
|
||||||
|
|
||||||
|
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
||||||
|
rsi.setConnectionId(protocol.getConnectionId());
|
||||||
|
rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
|
||||||
|
rsi.setClientId(protocol.getClientId());
|
||||||
|
protocol.sendToActiveMQ(rsi, new ResponseHandler() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
|
||||||
|
// ignore failures..
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,7 +212,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
private void deleteDurableQueues(List<ActiveMQQueue> queues) {
|
private void deleteDurableQueues(List<ActiveMQQueue> queues) {
|
||||||
try {
|
try {
|
||||||
for (ActiveMQQueue queue : queues) {
|
for (ActiveMQQueue queue : queues) {
|
||||||
LOG.debug("Removing subscription for {} ",queue.getPhysicalName());
|
LOG.debug("Removing queue subscription for {} ",queue.getPhysicalName());
|
||||||
DestinationInfo removeAction = new DestinationInfo();
|
DestinationInfo removeAction = new DestinationInfo();
|
||||||
removeAction.setConnectionId(protocol.getConnectionId());
|
removeAction.setConnectionId(protocol.getConnectionId());
|
||||||
removeAction.setDestination(queue);
|
removeAction.setDestination(queue);
|
||||||
|
@ -185,7 +226,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.warn("Could not delete the MQTT durable subs.", e);
|
LOG.warn("Could not delete the MQTT queue subsscriptions.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,7 +240,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
tokenizer.nextToken();
|
tokenizer.nextToken();
|
||||||
String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
|
String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
|
||||||
QoS qoS = QoS.valueOf(qosString);
|
QoS qoS = QoS.valueOf(qosString);
|
||||||
LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
|
LOG.trace("Restoring queue subscription: {}:{}", topicName, qoS);
|
||||||
|
|
||||||
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
|
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
|
||||||
consumerInfo.setDestination(queue);
|
consumerInfo.setDestination(queue);
|
||||||
|
@ -216,7 +257,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
|
||||||
restoredQueues.add(queue);
|
restoredQueues.add(queue);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Could not restore the MQTT durable subs.", e);
|
LOG.warn("Could not restore the MQTT queue subscriptions.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.mqtt;
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
|
import org.fusesource.mqtt.client.BlockingConnection;
|
||||||
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
|
import org.fusesource.mqtt.client.QoS;
|
||||||
|
import org.fusesource.mqtt.client.Topic;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,13 +38,6 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - This currently fails on the durable case because we have a hard time
|
|
||||||
// recovering the original Topic name when a client tries to subscribe
|
|
||||||
// durable to a VirtualTopic.* type topic.
|
|
||||||
@Override
|
|
||||||
@Ignore
|
|
||||||
public void testRetainedMessageOnVirtualTopics() throws Exception {}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testSendMQTTReceiveJMS() throws Exception {
|
public void testSendMQTTReceiveJMS() throws Exception {
|
||||||
|
@ -56,4 +55,99 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
|
||||||
public void testJmsMapping() throws Exception {
|
public void testJmsMapping() throws Exception {
|
||||||
doTestJmsMapping("VirtualTopic.test.foo");
|
doTestJmsMapping("VirtualTopic.test.foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testSubscribeOnVirtualTopicAsDurable() throws Exception {
|
||||||
|
MQTT mqtt = createMQTTConnection();
|
||||||
|
mqtt.setClientId("VirtualTopicSubscriber");
|
||||||
|
mqtt.setKeepAlive((short) 2);
|
||||||
|
mqtt.setCleanSession(false);
|
||||||
|
|
||||||
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
final String topicName = "VirtualTopic/foo/bah";
|
||||||
|
|
||||||
|
connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
|
||||||
|
|
||||||
|
assertTrue("Should create a durable subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.unsubscribe(new String[] { topicName });
|
||||||
|
|
||||||
|
assertTrue("Should remove a durable subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testDurableVirtaulTopicSubIsRecovered() throws Exception {
|
||||||
|
MQTT mqtt = createMQTTConnection();
|
||||||
|
mqtt.setClientId("VirtualTopicSubscriber");
|
||||||
|
mqtt.setKeepAlive((short) 2);
|
||||||
|
mqtt.setCleanSession(false);
|
||||||
|
|
||||||
|
final String topicName = "VirtualTopic/foo/bah";
|
||||||
|
|
||||||
|
{
|
||||||
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
|
||||||
|
|
||||||
|
assertTrue("Should create a durable subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Should be one inactive subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
{
|
||||||
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
assertTrue("Should recover a durable subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
|
||||||
|
|
||||||
|
assertTrue("Should still be just one durable subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue