Adds a subscription strategy model where the default is the normal
durable topic subscription based approach or a strategy that maps all
subscriptions and publish operations to a Virtual Topic model.  A
network of brokers can network the Queues instead of having the durable
topics subscriptions repaeted on each Broker.
This commit is contained in:
Timothy Bish 2014-08-06 17:41:19 -04:00
parent fff3c83971
commit 413e4840d6
14 changed files with 1037 additions and 239 deletions

View File

@ -70,9 +70,12 @@ public class MQTTInactivityMonitor extends TransportFilter {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
// for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
// should be sufficient to indicate the connection is still alive. If there were random data, or something
// outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
// for the PINGREQ/RESP frames, the currentCounter will be different
// from previousCounter, and that
// should be sufficient to indicate the connection is still alive.
// If there were random data, or something
// outside the scope of the spec, the wire format unrmarshalling
// would fail, so we don't need to handle
// PINGREQ/RESP explicitly here
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
@ -82,24 +85,21 @@ public class MQTTInactivityMonitor extends TransportFilter {
return;
}
if( (now-lastReceiveTime) >= readKeepAliveTime+readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress()));
onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
+ next.getRemoteAddress()));
}
});
}
}
};
private boolean allowReadCheck(long elapsed) {
return elapsed > (readGraceTime * 9 / 10);
}
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
}

View File

@ -26,7 +26,7 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@ -44,12 +44,15 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
private BrokerService brokerService = null;
@Override
protected String getDefaultWireFormatType() {
return "mqtt";
}
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new MQTTNIOTransport(format, socket);
}
@ -58,6 +61,7 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return result;
}
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
}
@ -75,6 +79,7 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return transport;
}
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerService);
@ -82,16 +87,17 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -17,11 +17,7 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
@ -34,19 +30,13 @@ import javax.jms.Message;
import javax.security.auth.login.CredentialException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
@ -60,17 +50,17 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.hawtbuf.Buffer;
@ -114,9 +104,8 @@ public class MQTTProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
private final Map<String, ActiveMQTopic> activeMQTopicMap = new LRUCache<String, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
@ -136,6 +125,15 @@ public class MQTTProtocolConverter {
private final MQTTPacketIdGenerator packetIdGenerator;
private boolean publishDollarTopics;
private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
/*
* Subscription strategy configuration element.
* > mqtt-default-subscriptions
* > mqtt-virtual-topic-subscriptions
*/
private String subscriptionStrategyName = "mqtt-default-subscriptions";
private MQTTSubscriptionStrategy subsciptionStrategy;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
this.brokerService = brokerService;
@ -149,22 +147,26 @@ public class MQTTProtocolConverter {
}
}
void sendToActiveMQ(Command command, ResponseHandler handler) {
public void sendToActiveMQ(Command command, ResponseHandler handler) {
// Lets intercept message send requests..
if (command instanceof ActiveMQMessage) {
ActiveMQMessage msg = (ActiveMQMessage) command;
if (!getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")) {
// We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
// specification requirements
if (handler != null) {
try {
handler.onResponse(this, new Response());
} catch (IOException e) {
e.printStackTrace();
try {
if (!getPublishDollarTopics() && getSubscriptionStrategy().isControlTopic(msg.getDestination())) {
// We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
// specification requirements for system assigned destinations.
if (handler != null) {
try {
handler.onResponse(this, new Response());
} catch (IOException e) {
e.printStackTrace();
}
}
return;
}
return;
} catch (IOException e) {
e.printStackTrace();
}
}
@ -189,54 +191,43 @@ public class MQTTProtocolConverter {
*/
public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
switch (frame.messageType()) {
case PINGREQ.TYPE: {
case PINGREQ.TYPE:
LOG.debug("Received a ping from client: " + getClientId());
sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
}
case CONNECT.TYPE: {
case CONNECT.TYPE:
CONNECT connect = new CONNECT().decode(frame);
onMQTTConnect(connect);
LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
break;
}
case DISCONNECT.TYPE: {
case DISCONNECT.TYPE:
LOG.debug("MQTT Client {} disconnecting", getClientId());
onMQTTDisconnect();
break;
}
case SUBSCRIBE.TYPE: {
case SUBSCRIBE.TYPE:
onSubscribe(new SUBSCRIBE().decode(frame));
break;
}
case UNSUBSCRIBE.TYPE: {
case UNSUBSCRIBE.TYPE:
onUnSubscribe(new UNSUBSCRIBE().decode(frame));
break;
}
case PUBLISH.TYPE: {
case PUBLISH.TYPE:
onMQTTPublish(new PUBLISH().decode(frame));
break;
}
case PUBACK.TYPE: {
case PUBACK.TYPE:
onMQTTPubAck(new PUBACK().decode(frame));
break;
}
case PUBREC.TYPE: {
case PUBREC.TYPE:
onMQTTPubRec(new PUBREC().decode(frame));
break;
}
case PUBREL.TYPE: {
case PUBREL.TYPE:
onMQTTPubRel(new PUBREL().decode(frame));
break;
}
case PUBCOMP.TYPE: {
case PUBCOMP.TYPE:
onMQTTPubComp(new PUBCOMP().decode(frame));
break;
}
default: {
default:
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
}
}
}
@ -332,54 +323,19 @@ public class MQTTProtocolConverter {
connected.set(true);
getMQTTTransport().sendToMQTT(ack.encode());
List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
if (connect.cleanSession()) {
packetIdGenerator.stopClientSession(getClientId());
deleteDurableSubs(subs);
} else {
packetIdGenerator.startClientSession(getClientId());
restoreDurableSubs(subs);
}
getSubscriptionStrategy().onConnect(connect);
}
});
}
});
}
public void deleteDurableSubs(List<SubscriptionInfo> subs) {
try {
for (SubscriptionInfo sub : subs) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sub.getSubcriptionName());
rsi.setClientId(sub.getClientId());
sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
} catch (Throwable e) {
LOG.warn("Could not delete the MQTT durable subs.", e);
}
}
public void restoreDurableSubs(List<SubscriptionInfo> subs) {
try {
for (SubscriptionInfo sub : subs) {
String name = sub.getSubcriptionName();
String[] split = name.split(":", 2);
QoS qoS = QoS.valueOf(split[0]);
onSubscribe(new Topic(split[1], qoS));
// mark this durable subscription as restored by Broker
restoredSubs.add(split[1]);
}
} catch (IOException e) {
LOG.warn("Could not restore the MQTT durable subs.", e);
}
}
void onMQTTDisconnect() throws MQTTProtocolException {
if (connected.get()) {
connected.set(false);
@ -408,42 +364,41 @@ public class MQTTProtocolConverter {
} else {
LOG.warn("No topics defined for Subscription " + command);
}
}
byte onSubscribe(final Topic topic) throws MQTTProtocolException {
public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
final String topicName = topic.name().toString();
final QoS topicQoS = topic.qos();
final String destinationName = topic.name().toString();
final QoS requestedQoS = topic.qos();
if (mqttSubscriptionByTopic.containsKey(topicName)) {
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
if (topicQoS != mqttSubscription.qos()) {
if (mqttSubscriptionByTopic.containsKey(destinationName)) {
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
if (requestedQoS != mqttSubscription.getQoS()) {
// remove old subscription as the QoS has changed
onUnSubscribe(topicName);
onUnSubscribe(destinationName);
} else {
// duplicate SUBSCRIBE packet, find all matching topics and re-send retained messages
resendRetainedMessages(mqttSubscription);
return (byte) topicQoS.ordinal();
try {
getSubscriptionStrategy().onReSubscribe(mqttSubscription);
} catch (IOException e) {
throw new MQTTProtocolException("Failed to find subscription strategy", true, e);
}
return (byte) requestedQoS.ordinal();
}
}
ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when cleansession is false
if (!connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
try {
return getSubscriptionStrategy().onSubscribe(destinationName, requestedQoS);
} catch (IOException e) {
throw new MQTTProtocolException("Failed while intercepting subscribe", true, e);
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, topicQoS, consumerInfo);
}
public byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException {
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, qoS, consumerInfo);
// optimistic add to local maps first to be able to handle commands in onActiveMQCommand
subscriptionsByConsumerId.put(id, mqttSubscription);
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
final byte[] qos = {-1};
@ -453,79 +408,24 @@ public class MQTTProtocolConverter {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.warn("Error subscribing to " + topicName, throwable);
LOG.warn("Error subscribing to {}", topicName, throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
qos[0] = (byte) topicQoS.ordinal();
qos[0] = (byte) qoS.ordinal();
}
}
});
if (qos[0] == SUBSCRIBE_ERROR) {
// remove from local maps if subscribe failed
subscriptionsByConsumerId.remove(id);
subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
mqttSubscriptionByTopic.remove(topicName);
}
return qos[0];
}
private void resendRetainedMessages(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
ActiveMQDestination destination = mqttSubscription.getDestination();
// check whether the Topic has been recovered in restoreDurableSubs
// mark subscription available for recovery for duplicate subscription
if (restoredSubs.remove(destination.getPhysicalName())) {
return;
}
String topicName = mqttSubscription.getTopicName();
// get TopicRegion
RegionBroker regionBroker;
try {
regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
} catch (Exception e) {
throw new MQTTProtocolException("Error subscribing to " + topicName + ": " + e.getMessage(), false, e);
}
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
final ConsumerId consumerId = consumerInfo.getConsumerId();
// use actual client id used to create connection to lookup connection context
final String connectionInfoClientId = connectionInfo.getClientId();
final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
// get all matching Topics
final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
// recover retroactive messages for matching subscription
for (Subscription subscription : dest.getConsumers()) {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try {
if (dest instanceof org.apache.activemq.broker.region.Topic) {
((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
} else if (dest instanceof VirtualTopicInterceptor) {
((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
}
if (subscription instanceof PrefetchSubscription) {
// request dispatch for prefetch subs
PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
prefetchSubscription.dispatchPending();
}
} catch (Exception e) {
throw new MQTTProtocolException("Error recovering retained messages for " +
dest.getName() + ": " + e.getMessage(), false, e);
}
break;
}
}
}
}
void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
UTF8Buffer[] topics = command.topics();
if (topics != null) {
@ -538,33 +438,38 @@ public class MQTTProtocolConverter {
sendToMQTT(ack.encode());
}
void onUnSubscribe(String topicName) {
MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
if (subs != null) {
ConsumerInfo info = subs.getConsumerInfo();
if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId());
}
RemoveInfo removeInfo = null;
if (info != null) {
removeInfo = info.createRemoveCommand();
}
sendToActiveMQ(removeInfo, null);
public void onUnSubscribe(String topicName) {
MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
if (subscription != null) {
doUnSubscribe(subscription);
// check if the durable sub also needs to be removed
if (subs.getConsumerInfo().getSubscriptionName() != null) {
// also remove it from restored durable subscriptions set
restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());
rsi.setClientId(connectionInfo.getClientId());
sendToActiveMQ(rsi, null);
// check if the broker side of the subscription needs to be removed
try {
getSubscriptionStrategy().onUnSubscribe(subscription);
} catch (IOException e) {
// Ignore
}
}
}
public void doUnSubscribe(MQTTSubscription subscription) {
mqttSubscriptionByTopic.remove(subscription.getTopicName());
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId());
}
RemoveInfo removeInfo = null;
if (info != null) {
removeInfo = info.createRemoveCommand();
}
sendToActiveMQ(removeInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
/**
* Dispatch an ActiveMQ command
*/
@ -610,7 +515,7 @@ public class MQTTProtocolConverter {
} else if (command.isBrokerInfo()) {
//ignore
} else {
LOG.debug("Do not know how to process ActiveMQ Command " + command);
LOG.debug("Do not know how to process ActiveMQ Command {}", command);
}
}
@ -647,7 +552,7 @@ public class MQTTProtocolConverter {
ack = publisherRecs.remove(command.messageId());
}
if (ack == null) {
LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
LOG.warn("Unknown PUBREL: {} received", command.messageId());
}
PUBCOMP pubcomp = new PUBCOMP();
pubcomp.messageId(command.messageId());
@ -680,16 +585,23 @@ public class MQTTProtocolConverter {
msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
}
ActiveMQTopic topic;
synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName());
if (topic == null) {
ActiveMQDestination destination;
synchronized (activeMQDestinationMap) {
destination = activeMQDestinationMap.get(command.topicName());
if (destination == null) {
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
topic = new ActiveMQTopic(topicName);
activeMQTopicMap.put(command.topicName().toString(), topic);
try {
destination = getSubscriptionStrategy().onSend(topicName);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
activeMQDestinationMap.put(command.topicName().toString(), destination);
}
}
msg.setJMSDestination(topic);
msg.setJMSDestination(destination);
msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
return msg;
}
@ -714,7 +626,8 @@ public class MQTTProtocolConverter {
synchronized (mqttTopicMap) {
topicName = mqttTopicMap.get(message.getJMSDestination());
if (topicName == null) {
topicName = MQTTProtocolSupport.convertActiveMQToMQTT(message.getDestination().getPhysicalName());
String amqTopicName = getSubscriptionStrategy().onSend(message.getDestination());
topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
mqttTopicMap.put(message.getJMSDestination(), topicName);
}
}
@ -803,9 +716,7 @@ public class MQTTProtocolConverter {
long keepAliveMS = keepAliveSeconds * 1000;
if (LOG.isDebugEnabled()) {
LOG.debug("MQTT Client " + getClientId() + " requests heart beat of " + keepAliveMS + " ms");
}
LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
try {
// if we have a default keep-alive value, and the client is trying to turn off keep-alive,
@ -822,12 +733,8 @@ public class MQTTProtocolConverter {
monitor.setReadGraceTime(readGracePeriod);
monitor.startMonitorThread();
if (LOG.isDebugEnabled()) {
LOG.debug("MQTT Client " + getClientId() +
" established heart beat of " + keepAliveMS +
" ms (" + keepAliveMS + "ms + " + readGracePeriod +
"ms grace period)");
}
LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)",
new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
@ -835,9 +742,7 @@ public class MQTTProtocolConverter {
void handleException(Throwable exception, MQTTFrame command) {
LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Exception detail", exception);
}
LOG.debug("Exception detail", exception);
if (connected.get() && connectionInfo != null) {
connected.set(false);
@ -861,7 +766,6 @@ public class MQTTProtocolConverter {
}
ResponseHandler createResponseHandler(final PUBLISH command) {
if (command != null) {
switch (command.qos()) {
case AT_LEAST_ONCE:
@ -944,6 +848,22 @@ public class MQTTProtocolConverter {
return connectionId;
}
public ConsumerId getNextConsumerId() {
return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
}
public boolean isCleanSession() {
return this.connect.cleanSession();
}
public String getSubscriptionStrategyName() {
return subscriptionStrategyName;
}
public void setSubscriptionStrategyName(String name) {
this.subscriptionStrategyName = name;
}
public String getClientId() {
if (clientId == null) {
if (connect != null && connect.clientId() != null) {
@ -954,4 +874,33 @@ public class MQTTProtocolConverter {
}
return clientId;
}
protected MQTTSubscriptionStrategy getSubscriptionStrategy() throws IOException {
if (subsciptionStrategy == null) {
synchronized (STRATAGY_FINDER) {
if (subsciptionStrategy != null) {
return subsciptionStrategy;
}
MQTTSubscriptionStrategy strategy = null;
if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
try {
strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
if (strategy instanceof BrokerServiceAware) {
((BrokerServiceAware)strategy).setBrokerService(brokerService);
}
strategy.initialize(this);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
} else {
throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
}
this.subsciptionStrategy = strategy;
}
}
return subsciptionStrategy;
}
}

View File

@ -132,7 +132,12 @@ public class MQTTSubscription {
/**
* @return the assigned QoS value for this subscription.
*/
public QoS qos() {
public QoS getQoS() {
return qos;
}
@Override
public String toString() {
return "MQTT Sub: topic[" + topicName + "] -> [" + consumerInfo.getDestination() + "]";
}
}

View File

@ -17,13 +17,13 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@ -33,8 +33,6 @@ import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.ServerSocketFactory;
/**
* A <a href="http://mqtt.org/">MQTT</a> transport factory
*/
@ -42,16 +40,19 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
private BrokerService brokerService = null;
@Override
protected String getDefaultWireFormatType() {
return "mqtt";
}
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
result.setAllowLinkStealing(true);
return result;
}
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerService);
@ -59,6 +60,7 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@ -79,10 +81,8 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
@Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
@ -29,7 +30,20 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.mqtt.codec.*;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -197,6 +211,14 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
protocolConverter.setPublishDollarTopics(publishDollarTopics);
}
public String getSubscriptionStrategyName() {
return protocolConverter != null ? protocolConverter.getSubscriptionStrategyName() : "default";
}
public void setSubscriptionStrategyName(String name) {
protocolConverter.setSubscriptionStrategyName(name);
}
public int getActiveMQSubscriptionPrefetch() {
return protocolConverter.getActiveMQSubscriptionPrefetch();
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.strategy;
import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTSubscription;
/**
* Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing
* the base functionality that is common to most implementations.
*/
public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy, BrokerServiceAware {
protected MQTTProtocolConverter protocol;
protected BrokerService brokerService;
@Override
public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException {
setProtocolConverter(protocol);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public void setProtocolConverter(MQTTProtocolConverter parent) {
this.protocol = parent;
}
@Override
public MQTTProtocolConverter getProtocolConverter() {
return protocol;
}
@Override
public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
String topicName = mqttSubscription.getTopicName();
// get TopicRegion
RegionBroker regionBroker;
try {
regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
} catch (Exception e) {
throw new MQTTProtocolException("Error subscribing to " + topicName + ": " + e.getMessage(), false, e);
}
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
final ConsumerId consumerId = consumerInfo.getConsumerId();
// use actual client id used to create connection to lookup connection
// context
final String connectionInfoClientId = protocol.getClientId();
final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
// get all matching Topics
final Set<org.apache.activemq.broker.region.Destination> matchingDestinations =
topicRegion.getDestinations(mqttSubscription.getDestination());
for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
// recover retroactive messages for matching subscription
for (Subscription subscription : dest.getConsumers()) {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try {
if (dest instanceof org.apache.activemq.broker.region.Topic) {
((org.apache.activemq.broker.region.Topic) dest).recoverRetroactiveMessages(connectionContext, subscription);
} else if (dest instanceof VirtualTopicInterceptor) {
((VirtualTopicInterceptor) dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
}
if (subscription instanceof PrefetchSubscription) {
// request dispatch for prefetch subs
PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
prefetchSubscription.dispatchPending();
}
} catch (Exception e) {
throw new MQTTProtocolException("Error recovering retained messages for " + dest.getName() + ": " + e.getMessage(), false, e);
}
break;
}
}
}
}
@Override
public ActiveMQDestination onSend(String topicName) {
return new ActiveMQTopic(topicName);
}
@Override
public String onSend(ActiveMQDestination destination) {
return destination.getPhysicalName();
}
@Override
public boolean isControlTopic(ActiveMQDestination destination) {
return destination.getPhysicalName().startsWith("$");
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.strategy;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
import org.apache.activemq.transport.mqtt.MQTTSubscription;
import org.apache.activemq.transport.mqtt.ResponseHandler;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation that uses unmapped topic subscriptions.
*/
public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class);
private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
@Override
public void onConnect(CONNECT connect) throws MQTTProtocolException {
List<SubscriptionInfo> subs;
try {
subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), protocol.getClientId());
} catch (IOException e) {
throw new MQTTProtocolException("Error loading store subscriptions", true, e);
}
if (connect.cleanSession()) {
deleteDurableSubs(subs);
} else {
restoreDurableSubs(subs);
}
}
@Override
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when clean session is false
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
}
return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
}
@Override
public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
ActiveMQDestination destination = mqttSubscription.getDestination();
// check whether the Topic has been recovered in restoreDurableSubs
// mark subscription available for recovery for duplicate subscription
if (restoredSubs.remove(destination.getPhysicalName())) {
return;
}
super.onReSubscribe(mqttSubscription);
}
@Override
public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException {
// check if the durable sub also needs to be removed
if (subscription.getConsumerInfo().getSubscriptionName() != null) {
// also remove it from restored durable subscriptions set
restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(protocol.getConnectionId());
rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
rsi.setClientId(protocol.getClientId());
protocol.sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
}
private void deleteDurableSubs(List<SubscriptionInfo> subs) {
try {
for (SubscriptionInfo sub : subs) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(protocol.getConnectionId());
rsi.setSubscriptionName(sub.getSubcriptionName());
rsi.setClientId(sub.getClientId());
protocol.sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
} catch (Throwable e) {
LOG.warn("Could not delete the MQTT durable subs.", e);
}
}
private void restoreDurableSubs(List<SubscriptionInfo> subs) {
try {
for (SubscriptionInfo sub : subs) {
String name = sub.getSubcriptionName();
String[] split = name.split(":", 2);
QoS qoS = QoS.valueOf(split[0]);
protocol.onSubscribe(new Topic(split[1], qoS));
// mark this durable subscription as restored by Broker
restoredSubs.add(split[1]);
}
} catch (IOException e) {
LOG.warn("Could not restore the MQTT durable subs.", e);
}
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.strategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTSubscription;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.codec.CONNECT;
/**
* Subscription management strategy used to control how MQTT clients
* subscribe to destination and how messages are addressed in order to
* arrive on the appropriate destinations.
*/
public interface MQTTSubscriptionStrategy {
/**
* Initialize the strategy before first use.
*
* @param protocol
* the MQTTProtocolConverter that is initializing the strategy
*
* @throws MQTTProtocolException if an error occurs during initialization.
*/
public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException;
/**
* Allows the strategy to perform any needed actions on client connect
* prior to the CONNACK frame being sent back such as recovering old
* subscriptions and performing any clean session actions.
*
* @throws MQTTProtocolException if an error occurs while processing the connect actions.
*/
public void onConnect(CONNECT connect) throws MQTTProtocolException;
/**
* Called when a new Subscription is being requested. This method allows the
* strategy to create a specific type of subscription for the client such as
* mapping topic subscriptions to Queues etc.
*
* @param topicName
* the requested Topic name to subscribe to.
* @param requestedQoS
* the QoS level that the client has requested for this subscription.
*
* @return the assigned QoS value given to the new subscription
*
* @throws MQTTProtocolException if an error occurs while processing the subscribe actions.
*/
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException;
/**
* Called when a client sends a duplicate subscribe request which should
* force any retained messages on that topic to be replayed again as though
* the client had just subscribed for the first time. The method should
* not unsubscribe the client as it might miss messages sent while the
* subscription is being recreated.
*
* @param subscription
* the MQTTSubscription that contains the subscription state.
*/
public void onReSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
/**
* Called when a client requests an un-subscribe a previous subscription.
*
* @param subscription
* the {@link MQTTSubscription} that is being removed.
*
* @throws MQTTProtocolException if an error occurs during the un-subscribe processing.
*/
public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
/**
* Intercepts PUBLISH operations from the client and allows the strategy to map the
* target destination so that the send operation will land in the destinations that
* this strategy has mapped the incoming subscribe requests to.
*
* @param topicName
* the targeted Topic that the client sent the message to.
*
* @return an ActiveMQ Topic instance that lands the send in the correct destinations.
*/
public ActiveMQDestination onSend(String topicName);
/**
* Intercepts send operations from the broker and allows the strategy to map the
* target topic name so that the client sees a valid Topic name.
*
* @param destination
* the destination that the message was dispatched from
*
* @return an Topic name that is valid for the receiving client.
*/
public String onSend(ActiveMQDestination destination);
/**
* Allows the protocol handler to interrogate an destination name to determine if it
* is equivalent to the MQTT control topic (starts with $). Since the mapped destinations
* that the strategy might alter the naming scheme the strategy must provide a way to
* reverse map and determine if the destination was originally an MQTT control topic.
*
* @param destination
* the destination to query.
*
* @return true if the destination is an MQTT control topic.
*/
public boolean isControlTopic(ActiveMQDestination destination);
/**
* Sets the {@link MQTTProtocolConverter} that is the parent of this strategy object.
*
* @param parent
* the {@link MQTTProtocolConverter} that owns this strategy.
*/
public void setProtocolConverter(MQTTProtocolConverter parent);
/**
* @return the {@link MQTTProtocolConverter} that owns this strategy.
*/
public MQTTProtocolConverter getProtocolConverter();
}

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.strategy;
import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT;
import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTSubscription;
import org.apache.activemq.transport.mqtt.ResponseHandler;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.codec.CONNECT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Subscription strategy that converts all MQTT subscribes that would be durable to
* Virtual Topic Queue subscriptions. Also maps all publish requests to be prefixed
* with the VirtualTopic. prefix unless already present.
*/
public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic.";
private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer.";
private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class);
private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>());
@Override
public void onConnect(CONNECT connect) throws MQTTProtocolException {
List<ActiveMQQueue> queues;
try {
queues = PersistenceAdapterSupport.listQueues(brokerService.getPersistenceAdapter(), new PersistenceAdapterSupport.DestinationMatcher() {
@Override
public boolean matches(ActiveMQDestination destination) {
if (destination.getPhysicalName().startsWith("Consumer." + protocol.getClientId())) {
LOG.debug("Recovered client sub: {}", destination.getPhysicalName());
return true;
}
return false;
}
});
} catch (IOException e) {
throw new MQTTProtocolException("Error restoring durable subscriptions", true, e);
}
if (connect.cleanSession()) {
deleteDurableQueues(queues);
} else {
restoreDurableQueue(queues);
}
}
@Override
public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
ActiveMQDestination destination = null;
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." +
VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
destination = new ActiveMQQueue(converted);
} else {
String converted = convertMQTTToActiveMQ(topicName);
if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
}
destination = new ActiveMQTopic(converted);
}
ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
}
@Override
public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
ActiveMQDestination destination = mqttSubscription.getDestination();
// check whether the Topic has been recovered in restoreDurableSubs
// mark subscription available for recovery for duplicate subscription
if (restoredQueues.remove(destination)) {
return;
}
if (mqttSubscription.getDestination().isTopic()) {
super.onReSubscribe(mqttSubscription);
} else {
protocol.doUnSubscribe(mqttSubscription);
ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
consumerInfo.setConsumerId(protocol.getNextConsumerId());
protocol.doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
}
}
@Override
public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException {
if (subscription.getDestination().isQueue()) {
DestinationInfo remove = new DestinationInfo();
remove.setConnectionId(protocol.getConnectionId());
remove.setDestination(subscription.getDestination());
remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
protocol.sendToActiveMQ(remove, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
}
@Override
public ActiveMQDestination onSend(String topicName) {
if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
} else {
return new ActiveMQTopic(topicName);
}
}
@Override
public String onSend(ActiveMQDestination destination) {
String amqTopicName = destination.getPhysicalName();
if (amqTopicName.startsWith(VIRTUALTOPIC_PREFIX)) {
amqTopicName = amqTopicName.substring(VIRTUALTOPIC_PREFIX.length());
}
return amqTopicName;
}
@Override
public boolean isControlTopic(ActiveMQDestination destination) {
String destinationName = destination.getPhysicalName();
if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) {
return true;
}
return false;
}
private void deleteDurableQueues(List<ActiveMQQueue> queues) {
try {
for (ActiveMQQueue queue : queues) {
DestinationInfo removeAction = new DestinationInfo();
removeAction.setConnectionId(protocol.getConnectionId());
removeAction.setDestination(queue);
removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
protocol.sendToActiveMQ(removeAction, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
} catch (Throwable e) {
LOG.warn("Could not delete the MQTT durable subs.", e);
}
}
private void restoreDurableQueue(List<ActiveMQQueue> queues) {
try {
for (ActiveMQQueue queue : queues) {
String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length());
StringTokenizer tokenizer = new StringTokenizer(name);
tokenizer.nextToken(":.");
String qosString = tokenizer.nextToken();
tokenizer.nextToken();
String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
QoS qoS = QoS.valueOf(qosString);
LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
consumerInfo.setDestination(queue);
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
protocol.doSubscribe(consumerInfo, topicName, qoS);
// mark this durable subscription as restored by Broker
restoredQueues.add(queue);
}
} catch (IOException e) {
LOG.warn("Could not restore the MQTT durable subs.", e);
}
}
}

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.strategy.MQTTDefaultSubscriptionStrategy

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.strategy.MQTTVirtualTopicSubscriptionStrategy

View File

@ -62,6 +62,7 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -153,7 +154,7 @@ public class MQTTTest extends MQTTTestSupport {
publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
}
latch.await(10, TimeUnit.SECONDS);
latch.await(20, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscriptionProvider.disconnect();
publishProvider.disconnect();
@ -488,13 +489,14 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 120 * 1000)
public void testRetainedMessage() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short) 2);
mqtt.setKeepAlive((short) 60);
final String RETAIN = "RETAIN";
final String TOPICA = "TopicA";
final String[] clientIds = { null, "foo", "durable" };
for (String clientId : clientIds) {
LOG.info("Testing now with Client ID: {}", clientId);
mqtt.setClientId(clientId);
mqtt.setCleanSession(!"durable".equals(clientId));
@ -552,6 +554,76 @@ public class MQTTTest extends MQTTTestSupport {
}
}
@Ignore
@Test(timeout = 120 * 1000)
public void testRetainedMessageOnVirtualTopics() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short) 60);
final String RETAIN = "RETAIN";
final String TOPICA = "VirtualTopic/TopicA";
final String[] clientIds = { null, "foo", "durable" };
for (String clientId : clientIds) {
LOG.info("Testing now with Client ID: {}", clientId);
mqtt.setClientId(clientId);
mqtt.setCleanSession(!"durable".equals(clientId));
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
// set retained message and check
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
// test duplicate subscription
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(15000, TimeUnit.MILLISECONDS);
assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{TOPICA});
// clear retained message and check that we don't receive it
connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(500, TimeUnit.MILLISECONDS);
assertNull("Retained message not cleared for " + clientId, msg);
connection.unsubscribe(new String[]{TOPICA});
// set retained message again and check
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No reset retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
// re-connect and check
connection.disconnect();
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No reset retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
LOG.info("Test now unsubscribing from: {} for the last time", TOPICA);
connection.unsubscribe(new String[]{TOPICA});
connection.disconnect();
}
}
@Test(timeout = 60 * 1000)
public void testUniqueMessageIds() throws Exception {
MQTT mqtt = createMQTTConnection();
@ -914,9 +986,12 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
doTestSendMQTTReceiveJMS("foo.*");
}
public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
final String DESTINATION_NAME = "foo.*";
// send retained message
final String RETAINED = "RETAINED";
@ -927,7 +1002,7 @@ public class MQTTTest extends MQTTTestSupport {
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
javax.jms.Topic jmsTopic = s.createTopic(destinationName);
MessageConsumer consumer = s.createConsumer(jmsTopic);
// check whether we received retained message on JMS subscribe
@ -952,6 +1027,10 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 2 * 60 * 1000)
public void testSendJMSReceiveMQTT() throws Exception {
doTestSendJMSReceiveMQTT("foo.far");
}
public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
@ -959,7 +1038,7 @@ public class MQTTTest extends MQTTTestSupport {
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
javax.jms.Topic jmsTopic = s.createTopic(destinationName);
MessageProducer producer = s.createProducer(jmsTopic);
// send retained message from JMS
@ -1130,10 +1209,14 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
doTestJmsMapping("test.foo");
}
public void doTestJmsMapping(String destinationName) throws Exception {
// start up jms consumer
Connection jmsConn = cf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createTopic("test.foo");
Destination dest = session.createTopic(destinationName);
MessageConsumer consumer = session.createConsumer(dest);
jmsConn.start();

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Run the basic tests with the NIO Transport.
*/
public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
@Override
@Before
public void setUp() throws Exception {
protocolConfig = "transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
super.setUp();
}
// TODO - This currently fails on the durable case because we have a hard time
// recovering the original Topic name when a client tries to subscribe
// durable to a VirtualTopic.* type topic.
@Override
@Ignore
public void testRetainedMessageOnVirtualTopics() throws Exception {}
@Override
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
doTestSendMQTTReceiveJMS("VirtualTopic.foo.*");
}
@Override
@Test(timeout = 2 * 60 * 1000)
public void testSendJMSReceiveMQTT() throws Exception {
doTestSendJMSReceiveMQTT("VirtualTopic.foo.far");
}
@Override
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
doTestJmsMapping("VirtualTopic.test.foo");
}
}