git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1309566 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-04-04 19:46:26 +00:00
parent 401f768414
commit 0f5b406b43
12 changed files with 552 additions and 2140 deletions

View File

@ -61,10 +61,8 @@ public class MQTTInactivityMonitor extends TransportFilter {
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean useKeepAlive = true;
private boolean keepAliveResponseRequired; private boolean keepAliveResponseRequired;
protected WireFormat wireFormat;
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastRunTime; long lastRunTime;
@ -99,7 +97,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next); super(next);
this.wireFormat = wireFormat;
} }
public void start() throws Exception { public void start() throws Exception {
@ -198,9 +195,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
} }
public void setUseKeepAlive(boolean val) {
useKeepAlive = val;
}
public long getReadCheckTime() { public long getReadCheckTime() {
return readCheckTime; return readCheckTime;
@ -231,7 +225,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
return this.monitorStarted.get(); return this.monitorStarted.get();
} }
protected synchronized void startMonitorThread() throws IOException { synchronized void startMonitorThread() {
if (monitorStarted.get()) { if (monitorStarted.get()) {
return; return;
} }
@ -258,7 +252,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
protected synchronized void stopMonitorThread() { synchronized void stopMonitorThread() {
if (monitorStarted.compareAndSet(true, false)) { if (monitorStarted.compareAndSet(true, false)) {
if (readCheckerTask != null) { if (readCheckerTask != null) {
readCheckerTask.cancel(); readCheckerTask.cancel();

View File

@ -16,37 +16,30 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.activemq.transport.stomp.FrameTranslator;
import org.apache.activemq.transport.stomp.LegacyFrameTranslator;
import org.apache.activemq.transport.stomp.ProtocolException; import org.apache.activemq.transport.stomp.ProtocolException;
import org.apache.activemq.transport.stomp.StompSubscription; import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.codec.CONNACK; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.codec.CONNECT; import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.DISCONNECT; import org.fusesource.mqtt.codec.*;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,66 +48,46 @@ class MQTTProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final String BROKER_VERSION;
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
static {
InputStream in = null;
String version = "5.6.0";
if ((in = MQTTProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
try {
version = reader.readLine();
} catch (Exception e) {
}
}
BROKER_VERSION = version;
}
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1); private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1); private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>(); private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(); private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(); private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
private final MQTTTransport mqttTransport; private final MQTTTransport mqttTransport;
private final Object commnadIdMutex = new Object(); private final Object commnadIdMutex = new Object();
private int lastCommandId; private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean connected = new AtomicBoolean(false);
private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); private ConnectionInfo connectionInfo = new ConnectionInfo();
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
private final BrokerContext brokerContext;
private String version = "1.0";
ConnectionInfo connectionInfo = new ConnectionInfo();
private CONNECT connect; private CONNECT connect;
private String clientId; private String clientId;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) { public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
this.mqttTransport = mqttTransport; this.mqttTransport = mqttTransport;
this.brokerContext = brokerContext;
} }
protected int generateCommandId() { int generateCommandId() {
synchronized (commnadIdMutex) { synchronized (commnadIdMutex) {
return lastCommandId++; return lastCommandId++;
} }
} }
protected void sendToActiveMQ(Command command, ResponseHandler handler) { void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId()); command.setCommandId(generateCommandId());
if (handler != null) { if (handler != null) {
command.setResponseRequired(true); command.setResponseRequired(true);
@ -123,6 +96,14 @@ class MQTTProtocolConverter {
mqttTransport.sendToActiveMQ(command); mqttTransport.sendToActiveMQ(command);
} }
void sendToMQTT(MQTTFrame frame) {
try {
mqttTransport.sendToMQTT(frame);
} catch (IOException e) {
LOG.warn("Failed to send frame " + frame, e);
}
}
/** /**
* Convert a MQTT command * Convert a MQTT command
@ -138,6 +119,7 @@ class MQTTProtocolConverter {
} }
case CONNECT.TYPE: { case CONNECT.TYPE: {
onMQTTConnect(new CONNECT().decode(frame)); onMQTTConnect(new CONNECT().decode(frame));
LOG.debug("MQTT Client " + getClientId() + " connected.");
break; break;
} }
case DISCONNECT.TYPE: { case DISCONNECT.TYPE: {
@ -145,6 +127,22 @@ class MQTTProtocolConverter {
stopTransport(); stopTransport();
break; break;
} }
case SUBSCRIBE.TYPE: {
onSubscribe(new SUBSCRIBE().decode(frame));
break;
}
case UNSUBSCRIBE.TYPE: {
onUnSubscribe(new UNSUBSCRIBE().decode(frame));
break;
}
case PUBLISH.TYPE: {
onMQTTPublish(new PUBLISH().decode(frame));
break;
}
case PUBACK.TYPE: {
onMQTTPubAck(new PUBACK().decode(frame));
break;
}
default: default:
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
} }
@ -152,7 +150,7 @@ class MQTTProtocolConverter {
} }
protected void onMQTTConnect(final CONNECT connect) throws ProtocolException { void onMQTTConnect(final CONNECT connect) throws ProtocolException {
if (connected.get()) { if (connected.get()) {
throw new ProtocolException("All ready connected."); throw new ProtocolException("All ready connected.");
@ -171,15 +169,13 @@ class MQTTProtocolConverter {
String passswd = ""; String passswd = "";
if (connect.password() != null) { if (connect.password() != null) {
passswd = connect.password().toString(); passswd = connect.password().toString();
} }
configureInactivityMonitor(connect.keepAlive()); configureInactivityMonitor(connect.keepAlive());
connectionInfo.setConnectionId(connectionId); connectionInfo.setConnectionId(connectionId);
if (clientId != null && clientId.isEmpty() == false) { if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId); connectionInfo.setClientId(clientId);
} else { } else {
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
@ -222,6 +218,7 @@ class MQTTProtocolConverter {
CONNACK ack = new CONNACK(); CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_ACCEPTED); ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
connected.set(true);
getMQTTTransport().sendToMQTT(ack.encode()); getMQTTTransport().sendToMQTT(ack.encode());
} }
@ -231,13 +228,88 @@ class MQTTProtocolConverter {
}); });
} }
void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
SUBACK result = new SUBACK();
Topic[] topics = command.topics();
if (topics != null) {
byte[] qos = new byte[topics.length];
for (int i = 0; i < topics.length; i++) {
qos[i] = (byte) onSubscribe(command, topics[i]).ordinal();
}
SUBACK ack = new SUBACK();
ack.messageId(command.messageId());
ack.grantedQos(qos);
try {
getMQTTTransport().sendToMQTT(ack.encode());
} catch (IOException e) {
LOG.warn("Couldn't send SUBACK for " + command, e);
}
} else {
LOG.warn("No topics defined for Subscription " + command);
}
}
QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
if (destination == null) {
throw new MQTTProtocolException("Invalid Destination.");
}
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(1000);
consumerInfo.setDispatchAsync(true);
if (!connect.cleanSession() && (connect.clientId() != null)) {
//by default subscribers are persistent
consumerInfo.setSubscriptionName(connect.clientId().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
sendToActiveMQ(consumerInfo, null);
return topic.qos();
}
void onUnSubscribe(UNSUBSCRIBE command) {
UTF8Buffer[] topics = command.topics();
if (topics != null) {
for (int i = 0; i < topics.length; i++) {
onUnSubscribe(topics[i]);
}
}
UNSUBACK ack = new UNSUBACK();
ack.messageId(command.messageId());
sendToMQTT(ack.encode());
}
void onUnSubscribe(UTF8Buffer topicName) {
MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
if (subs != null) {
ConsumerInfo info = subs.getConsumerInfo();
if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId());
}
RemoveInfo removeInfo = info.createRemoveCommand();
sendToActiveMQ(removeInfo, null);
}
}
/** /**
* Dispatch a ActiveMQ command * Dispatch a ActiveMQ command
*/ */
public void onActiveMQCommand(Command command) throws IOException, JMSException { public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) { if (command.isResponse()) {
Response response = (Response) command; Response response = (Response) command;
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
@ -252,28 +324,59 @@ class MQTTProtocolConverter {
} }
} else if (command.isMessageDispatch()) { } else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command; MessageDispatch md = (MessageDispatch) command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) { if (sub != null) {
//sub.onMessageDispatch(md); MessageAck ack = sub.createMessageAck(md);
PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage());
if (ack != null) {
synchronized (consumerAcks) {
consumerAcks.put(publish.messageId(), ack);
}
}
getMQTTTransport().sendToMQTT(publish.encode());
} }
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection? // Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError) command).getException(); Throwable exception = ((ConnectionError) command).getException();
handleException(exception, null); handleException(exception, null);
} else if (command.isBrokerInfo()) {
//ignore
} else { } else {
LOG.debug("Do not know how to process ActiveMQ Command " + command); LOG.debug("Do not know how to process ActiveMQ Command " + command);
} }
} }
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
ActiveMQMessage message = convertMessage(command);
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
}
public ActiveMQMessage convertMessage(PUBLISH command) throws IOException, JMSException { void onMQTTPubAck(PUBACK command) {
short messageId = command.messageId();
MessageAck ack = null;
synchronized (consumerAcks) {
ack = consumerAcks.remove(messageId);
}
if (ack != null) {
getMQTTTransport().sendToActiveMQ(ack);
}
}
ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
StringBuilder msgId = new StringBuilder();
msgId.append("ID:").append(getClientId()).append(":").append(command.messageId());
msg.setJMSMessageID(msgId.toString());
msg.setJMSPriority(4);
//ActiveMQTopic topic = new ActiveMQTopic(topicName); msg.setProducerId(producerId);
MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
msg.setMessageId(id);
msg.setTimestamp(System.currentTimeMillis());
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
ActiveMQTopic topic = null; ActiveMQTopic topic = null;
synchronized (activeMQTopicMap) { synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName()); topic = activeMQTopicMap.get(command.topicName());
@ -288,36 +391,48 @@ class MQTTProtocolConverter {
return msg; return msg;
} }
public MQTTFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
PUBLISH result = new PUBLISH(); PUBLISH result = new PUBLISH();
String msgId = message.getJMSMessageID(); short id = (short) message.getMessageId().getProducerSequenceId();
int offset = msgId.lastIndexOf(':');
short id = 0;
if (offset > 0) {
Short.parseShort(msgId.substring(offset, msgId.length() - 1));
}
result.messageId(id); result.messageId(id);
QoS qoS;
if (message.propertyExists(QOS_PROPERTY_NAME)) {
int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
qoS = QoS.values()[ordinal];
UTF8Buffer topicName = null; } else {
qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
}
result.qos(qoS);
UTF8Buffer topicName;
synchronized (mqttTopicMap) { synchronized (mqttTopicMap) {
topicName = mqttTopicMap.get(message.getJMSDestination()); topicName = mqttTopicMap.get(message.getJMSDestination());
if (topicName == null) { if (topicName == null) {
topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replaceAll(".", "/")); topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replace('.', '/'));
mqttTopicMap.put(message.getJMSDestination(), topicName); mqttTopicMap.put(message.getJMSDestination(), topicName);
} }
} }
result.topicName(topicName); result.topicName(topicName);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { ByteSequence byteSequence = message.getContent();
if (message.isCompressed()) {
if (!message.isCompressed() && message.getContent() != null) { Inflater inflater = new Inflater();
ByteSequence msgContent = message.getContent(); inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
if (msgContent.getLength() > 4) { byte[] data = new byte[4096];
byte[] content = new byte[msgContent.getLength() - 4]; int read;
System.arraycopy(msgContent.data, 4, content, 0, content.length); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
result.payload(new Buffer(content)); while ((read = inflater.inflate(data, 0, data.length)) != 0) {
bytesOut.write(data, 0, read);
} }
byteSequence = bytesOut.toByteSequence();
}
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (byteSequence.getLength() > 4) {
byte[] content = new byte[byteSequence.getLength() - 4];
System.arraycopy(byteSequence.data, 4, content, 0, content.length);
result.payload(new Buffer(content));
} else { } else {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
String messageText = msg.getText(); String messageText = msg.getText();
@ -334,9 +449,11 @@ class MQTTProtocolConverter {
msg.readBytes(data); msg.readBytes(data);
result.payload(new Buffer(data)); result.payload(new Buffer(data));
} else { } else {
LOG.debug("Cannot convert " + message + " to a MQTT PUBLISH"); if (byteSequence != null && byteSequence.getLength() > 0) {
result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
} }
return result.encode(); }
return result;
} }
@ -359,12 +476,8 @@ class MQTTProtocolConverter {
return rc; return rc;
} }
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
void configureInactivityMonitor(short heartBeat) {
protected void configureInactivityMonitor(short heartBeat) throws ProtocolException {
try { try {
int heartBeatMS = heartBeat * 1000; int heartBeatMS = heartBeat * 1000;
@ -372,18 +485,17 @@ class MQTTProtocolConverter {
monitor.setReadCheckTime(heartBeatMS); monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS); monitor.setInitialDelayTime(heartBeatMS);
monitor.startMonitorThread(); monitor.startMonitorThread();
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
} }
LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs"); LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs");
} }
protected void handleException(Throwable exception, MQTTFrame command) throws IOException { void handleException(Throwable exception, MQTTFrame command) {
LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Exception detail", exception); LOG.debug("Exception detail", exception);
@ -396,6 +508,12 @@ class MQTTProtocolConverter {
} }
} }
void checkConnected() throws MQTTProtocolException {
if (!connected.get()) {
throw new MQTTProtocolException("Not connected.");
}
}
private String getClientId() { private String getClientId() {
if (clientId == null) { if (clientId == null) {
if (connect != null && connect.clientId() != null) { if (connect != null && connect.clientId() != null) {
@ -414,4 +532,67 @@ class MQTTProtocolConverter {
LOG.debug("Failed to stop MQTT transport ", e); LOG.debug("Failed to stop MQTT transport ", e);
} }
} }
ResponseHandler createResponseHandler(final PUBLISH command) {
if (command != null) {
switch (command.qos()) {
case AT_LEAST_ONCE:
return new ResponseHandler() {
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
} else {
PUBACK ack = new PUBACK();
ack.messageId(command.messageId());
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
};
case EXACTLY_ONCE:
return new ResponseHandler() {
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
} else {
PUBACK ack = new PUBACK();
ack.messageId(command.messageId());
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
};
case AT_MOST_ONCE:
break;
}
}
/*
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null) {
return new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
// We report back the failure but we don't close the connection.
Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, command);
} else {
StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.RECEIPT);
sc.setHeaders(new HashMap<String, String>(1));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
stompTransport.sendToStomp(sc);
}
}
};
}
*/
return null;
}
private String convertMQTTToActiveMQ(String name) {
String result = name.replace('>', '#');
result = result.replace('*', '+');
result = result.replace('.', '/');
return result;
}
} }

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://mqtt.org/">MQTT</a> over SSL transport factory
*/
public class MQTTSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
return "mqtt";
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -16,69 +16,39 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.codec.MQTTFrame;
/** /**
* Keeps track of the STOMP subscription so that acking is correctly done. * Keeps track of the MQTT client subscription so that acking is correctly done.
*/ */
public class MQTTSubscription { class MQTTSubscription {
private final MQTTProtocolConverter protocolConverter;
private final ConsumerInfo consumerInfo;
private ActiveMQDestination destination;
private final QoS qos;
protected final MQTTProtocolConverter protocolConverter; public MQTTSubscription(MQTTProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) {
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
protected ActiveMQDestination destination;
protected String transformation;
public MQTTSubscription(MQTTProtocolConverter protocolConverter, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
this.protocolConverter = protocolConverter; this.protocolConverter = protocolConverter;
this.subscriptionId = subscriptionId;
this.consumerInfo = consumerInfo; this.consumerInfo = consumerInfo;
this.transformation = transformation; this.qos = qos;
} }
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { MessageAck createMessageAck(MessageDispatch md) {
ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
/*
if (ackMode == CLIENT_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == INDIVIDUAL_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
*/
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getMQTTTransport().sendToActiveMQ(ack);
MQTTFrame command = protocolConverter.convertMessage(message); switch (qos) {
protocolConverter.getMQTTTransport().sendToMQTT(command); case AT_MOST_ONCE: {
return null;
} }
public String getSubscriptionId() {
return subscriptionId;
} }
return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
}
public void setDestination(ActiveMQDestination destination) { public void setDestination(ActiveMQDestination destination) {
this.destination = destination; this.destination = destination;

View File

@ -29,7 +29,7 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
/** /**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory * A <a href="http://mqtt.org/">MQTT</a> transport factory
*/ */
public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware { public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware {

View File

@ -36,9 +36,7 @@ import org.slf4j.LoggerFactory;
* The StompTransportFilter normally sits on top of a TcpTransport that has been * The StompTransportFilter normally sits on top of a TcpTransport that has been
* configured with the StompWireFormat and is used to convert STOMP commands to * configured with the StompWireFormat and is used to convert STOMP commands to
* ActiveMQ commands. All of the conversion work is done by delegating to the * ActiveMQ commands. All of the conversion work is done by delegating to the
* MQTTProtocolConverter. * MQTTProtocolConverter
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/ */
public class MQTTTransportFilter extends TransportFilter implements MQTTTransport { public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class); private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
@ -62,7 +60,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
try { try {
final Command command = (Command) o; final Command command = (Command) o;
protocolConverter.onActiveMQCommand(command); protocolConverter.onActiveMQCommand(command);
} catch (JMSException e) { } catch (Exception e) {
throw IOExceptionSupport.create(e); throw IOExceptionSupport.create(e);
} }
} }

View File

@ -1,34 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
public class WildCardConvertor {
static String convertActiveMQToMQTT(String name) {
String result = name.replaceAll("#", ">");
result = result.replaceAll("+", "*");
result = result.replaceAll("/", ".");
return result;
}
static String convertMQTTToActiveMQ(String name) {
String result = name.replaceAll(">", "#");
result = result.replaceAll("*", "+");
result = result.replaceAll(".", "/");
return result;
}
}

View File

@ -19,7 +19,7 @@
</head> </head>
<body> <body>
An implementation of the MQTT 3.1 protocol - see http://mqtt.org/ A Broker side implementation of the MQTT 3.1 protocol - see http://mqtt.org/
</body> </body>
</html> </html>

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.MQTTSslTransportFactory

View File

@ -27,7 +27,7 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
// https://issues.apache.org/jira/browse/AMQ-3393
public class MQTTConnectTest { public class MQTTConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
BrokerService brokerService; BrokerService brokerService;

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Vector;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTSSLConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
@Before
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testConnect() throws Exception {
brokerService.addConnector("mqtt+ssl://localhost:8883");
brokerService.start();
MQTT mqtt = new MQTT();
mqtt.setHost("ssl://localhost:8883");
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(ctx);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Thread.sleep(1000);
connection.disconnect();
}
private static class DefaultTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}