git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1304984 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-03-25 06:33:49 +00:00
parent 555d8890ac
commit cfc6917a79
17 changed files with 3455 additions and 0 deletions

View File

@ -96,6 +96,10 @@
<groupId>org.fusesource.fuse-extra</groupId>
<artifactId>fusemq-leveldb</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
<!-- =============================== -->
<!-- Optional Dependencies -->

View File

@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTInactivityMonitor extends TransportFilter {
private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
private SchedulerTimerTask readCheckerTask;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean useKeepAlive = true;
private boolean keepAliveResponseRequired;
protected WireFormat wireFormat;
private final Runnable readChecker = new Runnable() {
long lastRunTime;
public void run() {
long now = System.currentTimeMillis();
long elapsed = (now - lastRunTime);
if (lastRunTime != 0 && LOG.isDebugEnabled()) {
LOG.debug("" + elapsed + " ms elapsed since last read check.");
}
// Perhaps the timer executed a read check late.. and then executes
// the next read check on time which causes the time elapsed between
// read checks to be small..
// If less than 90% of the read check Time elapsed then abort this readcheck.
if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
return;
}
lastRunTime = now;
readCheck();
}
};
private boolean allowReadCheck(long elapsed) {
return elapsed > (readCheckTime * 9 / 10);
}
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
this.wireFormat = wireFormat;
}
public void start() throws Exception {
next.start();
startMonitorThread();
}
public void stop() throws Exception {
stopMonitorThread();
next.stop();
}
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
}
return;
}
if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
}
;
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
}
}
commandReceived.set(false);
}
public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true);
try {
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
sendLock.readLock().lock();
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
} finally {
sendLock.readLock().unlock();
}
}
} else {
transportListener.onCommand(command);
}
} finally {
inReceive.set(false);
}
}
public void oneway(Object o) throws IOException {
// To prevent the inactivity monitor from sending a message while we
// are performing a send we take a read lock. The inactivity monitor
// sends its Heart-beat commands under a write lock. This means that
// the MutexTransport is still responsible for synchronizing sends
this.sendLock.readLock().lock();
inSend.set(true);
try {
doOnewaySend(o);
} finally {
commandSent.set(true);
inSend.set(false);
this.sendLock.readLock().unlock();
}
}
// Must be called under lock, either read or write on sendLock.
private void doOnewaySend(Object command) throws IOException {
if (failed.get()) {
throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
}
next.oneway(command);
}
public void onException(IOException error) {
if (failed.compareAndSet(false, true)) {
stopMonitorThread();
transportListener.onException(error);
}
}
public void setUseKeepAlive(boolean val) {
useKeepAlive = val;
}
public long getReadCheckTime() {
return readCheckTime;
}
public void setReadCheckTime(long readCheckTime) {
this.readCheckTime = readCheckTime;
}
public long getInitialDelayTime() {
return initialDelayTime;
}
public void setInitialDelayTime(long initialDelayTime) {
this.initialDelayTime = initialDelayTime;
}
public boolean isKeepAliveResponseRequired() {
return this.keepAliveResponseRequired;
}
public void setKeepAliveResponseRequired(boolean value) {
this.keepAliveResponseRequired = value;
}
public boolean isMonitorStarted() {
return this.monitorStarted.get();
}
protected synchronized void startMonitorThread() throws IOException {
if (monitorStarted.get()) {
return;
}
if (readCheckTime > 0) {
readCheckerTask = new SchedulerTimerTask(readChecker);
}
if (readCheckTime > 0) {
monitorStarted.set(true);
synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) {
ASYNC_TASKS = createExecutor();
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
}
CHECKER_COUNTER++;
if (readCheckTime > 0) {
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
}
}
}
}
protected synchronized void stopMonitorThread() {
if (monitorStarted.compareAndSet(true, false)) {
if (readCheckerTask != null) {
readCheckerTask.cancel();
}
synchronized (AbstractInactivityMonitor.class) {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
ASYNC_TASKS.shutdown();
ASYNC_TASKS = null;
}
}
}
}
private ThreadFactory factory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
thread.setDaemon(true);
return thread;
}
};
private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}
}

View File

@ -0,0 +1,417 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.stomp.FrameTranslator;
import org.apache.activemq.transport.stomp.LegacyFrameTranslator;
import org.apache.activemq.transport.stomp.ProtocolException;
import org.apache.activemq.transport.stomp.StompSubscription;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class MQTTProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final String BROKER_VERSION;
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
static {
InputStream in = null;
String version = "5.6.0";
if ((in = MQTTProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
try {
version = reader.readLine();
} catch (Exception e) {
}
}
BROKER_VERSION = version;
}
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
private final MQTTTransport mqttTransport;
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
private final BrokerContext brokerContext;
private String version = "1.0";
ConnectionInfo connectionInfo = new ConnectionInfo();
private CONNECT connect;
private String clientId;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
this.mqttTransport = mqttTransport;
this.brokerContext = brokerContext;
}
protected int generateCommandId() {
synchronized (commnadIdMutex) {
return lastCommandId++;
}
}
protected void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
mqttTransport.sendToActiveMQ(command);
}
/**
* Convert a MQTT command
*/
public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
switch (frame.messageType()) {
case PINGREQ.TYPE: {
mqttTransport.sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
}
case CONNECT.TYPE: {
onMQTTConnect(new CONNECT().decode(frame));
break;
}
case DISCONNECT.TYPE: {
LOG.debug("MQTT Client " + getClientId() + " disconnecting");
stopTransport();
break;
}
default:
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
}
}
protected void onMQTTConnect(final CONNECT connect) throws ProtocolException {
if (connected.get()) {
throw new ProtocolException("All ready connected.");
}
this.connect = connect;
String clientId = "";
if (connect.clientId() != null) {
clientId = connect.clientId().toString();
}
String userName = "";
if (connect.userName() != null) {
userName = connect.userName().toString();
}
String passswd = "";
if (connect.password() != null) {
passswd = connect.password().toString();
}
configureInactivityMonitor(connect.keepAlive());
connectionInfo.setConnectionId(connectionId);
if (clientId != null && clientId.isEmpty() == false) {
connectionInfo.setClientId(clientId);
} else {
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
}
connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(userName);
connectionInfo.setPassword(passswd);
connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
//let the client know
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
getMQTTTransport().sendToMQTT(ack.encode());
getMQTTTransport().onException(IOExceptionSupport.create(exception));
return;
}
final SessionInfo sessionInfo = new SessionInfo(sessionId);
sendToActiveMQ(sessionInfo, null);
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
getMQTTTransport().sendToMQTT(ack.encode());
getMQTTTransport().onException(IOExceptionSupport.create(exception));
}
CONNACK ack = new CONNACK();
ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
getMQTTTransport().sendToMQTT(ack.encode());
}
});
}
});
}
/**
* Dispatch a ActiveMQ command
*/
public void onActiveMQCommand(Command command) throws IOException, JMSException {
if (command.isResponse()) {
Response response = (Response) command;
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if (rh != null) {
rh.onResponse(this, response);
} else {
// Pass down any unexpected errors. Should this close the connection?
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
handleException(exception, null);
}
}
} else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
//sub.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError) command).getException();
handleException(exception, null);
} else {
LOG.debug("Do not know how to process ActiveMQ Command " + command);
}
}
public ActiveMQMessage convertMessage(PUBLISH command) throws IOException, JMSException {
ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
StringBuilder msgId = new StringBuilder();
msgId.append("ID:").append(getClientId()).append(":").append(command.messageId());
msg.setJMSMessageID(msgId.toString());
msg.setJMSPriority(4);
//ActiveMQTopic topic = new ActiveMQTopic(topicName);
ActiveMQTopic topic = null;
synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName());
if (topic == null) {
String topicName = command.topicName().toString().replaceAll("/", ".");
topic = new ActiveMQTopic(topicName);
activeMQTopicMap.put(command.topicName(), topic);
}
}
msg.setJMSDestination(topic);
msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
return msg;
}
public MQTTFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
PUBLISH result = new PUBLISH();
String msgId = message.getJMSMessageID();
int offset = msgId.lastIndexOf(':');
short id = 0;
if (offset > 0) {
Short.parseShort(msgId.substring(offset, msgId.length() - 1));
}
result.messageId(id);
UTF8Buffer topicName = null;
synchronized (mqttTopicMap) {
topicName = mqttTopicMap.get(message.getJMSDestination());
if (topicName == null) {
topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replaceAll(".", "/"));
mqttTopicMap.put(message.getJMSDestination(), topicName);
}
}
result.topicName(topicName);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (!message.isCompressed() && message.getContent() != null) {
ByteSequence msgContent = message.getContent();
if (msgContent.getLength() > 4) {
byte[] content = new byte[msgContent.getLength() - 4];
System.arraycopy(msgContent.data, 4, content, 0, content.length);
result.payload(new Buffer(content));
}
} else {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
String messageText = msg.getText();
if (messageText != null) {
result.payload(new Buffer(msg.getText().getBytes("UTF-8")));
}
}
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
msg.setReadOnlyBody(true);
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
result.payload(new Buffer(data));
} else {
LOG.debug("Cannot convert " + message + " to a MQTT PUBLISH");
}
return result.encode();
}
public MQTTTransport getMQTTTransport() {
return mqttTransport;
}
public ActiveMQDestination createTempDestination(String name, boolean topic) {
ActiveMQDestination rc = tempDestinations.get(name);
if (rc == null) {
if (topic) {
rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
} else {
rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
}
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
return rc;
}
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
protected void configureInactivityMonitor(short heartBeat) throws ProtocolException {
try {
int heartBeatMS = heartBeat * 1000;
MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS);
monitor.startMonitorThread();
} catch (Exception ex) {
}
LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs");
}
protected void handleException(Throwable exception, MQTTFrame command) throws IOException {
LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Exception detail", exception);
}
try {
getMQTTTransport().stop();
} catch (Throwable e) {
LOG.error("Failed to stop MQTTT Transport ", e);
}
}
private String getClientId() {
if (clientId == null) {
if (connect != null && connect.clientId() != null) {
clientId = connect.clientId().toString();
}
} else {
clientId = "";
}
return clientId;
}
private void stopTransport() {
try {
getMQTTTransport().stop();
} catch (Throwable e) {
LOG.debug("Failed to stop MQTT transport ", e);
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
public class MQTTProtocolException extends IOException {
private static final long serialVersionUID = -2869735532997332242L;
private final boolean fatal;
public MQTTProtocolException() {
this(null);
}
public MQTTProtocolException(String s) {
this(s, false);
}
public MQTTProtocolException(String s, boolean fatal) {
this(s, fatal, null);
}
public MQTTProtocolException(String s, boolean fatal, Throwable cause) {
super(s);
this.fatal = fatal;
initCause(cause);
}
public boolean isFatal() {
return fatal;
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.fusesource.mqtt.codec.MQTTFrame;
/**
* Keeps track of the STOMP subscription so that acking is correctly done.
*/
public class MQTTSubscription {
protected final MQTTProtocolConverter protocolConverter;
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
protected ActiveMQDestination destination;
protected String transformation;
public MQTTSubscription(MQTTProtocolConverter protocolConverter, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
this.protocolConverter = protocolConverter;
this.subscriptionId = subscriptionId;
this.consumerInfo = consumerInfo;
this.transformation = transformation;
}
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
/*
if (ackMode == CLIENT_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == INDIVIDUAL_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
*/
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getMQTTTransport().sendToActiveMQ(ack);
MQTTFrame command = protocolConverter.convertMessage(message);
protocolConverter.getMQTTTransport().sendToMQTT(command);
}
public String getSubscriptionId() {
return subscriptionId;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
public ActiveMQDestination getDestination() {
return destination;
}
public ConsumerInfo getConsumerInfo() {
return consumerInfo;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.security.cert.X509Certificate;
import org.apache.activemq.command.Command;
import org.fusesource.mqtt.codec.MQTTFrame;
/**
* Basic interface that mediates between protocol converter and transport
*/
public interface MQTTTransport {
public void sendToActiveMQ(Command command);
public void sendToMQTT(MQTTFrame command) throws IOException;
public X509Certificate[] getPeerCertificates();
public void onException(IOException error);
public MQTTInactivityMonitor getInactivityMonitor();
public MQTTWireFormat getWireFormat();
public void stop() throws Exception;
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
*/
public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
return "mqtt";
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
@Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.security.cert.X509Certificate;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The StompTransportFilter normally sits on top of a TcpTransport that has been
* configured with the StompWireFormat and is used to convert STOMP commands to
* ActiveMQ commands. All of the conversion work is done by delegating to the
* MQTTProtocolConverter.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO");
private final MQTTProtocolConverter protocolConverter;
private MQTTInactivityMonitor monitor;
private MQTTWireFormat wireFormat;
private boolean trace;
public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
if (wireFormat instanceof MQTTWireFormat) {
this.wireFormat = (MQTTWireFormat) wireFormat;
}
}
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
protocolConverter.onActiveMQCommand(command);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
public void onCommand(Object command) {
try {
if (trace) {
TRACE.trace("Received: \n" + command);
}
protocolConverter.onMQTTCommand((MQTTFrame) command);
} catch (IOException e) {
onException(e);
} catch (JMSException e) {
onException(IOExceptionSupport.create(e));
}
}
public void sendToActiveMQ(Command command) {
TransportListener l = transportListener;
if (l != null) {
l.onCommand(command);
}
}
public void sendToMQTT(MQTTFrame command) throws IOException {
if (trace) {
TRACE.trace("Sending: \n" + command);
}
Transport n = next;
if (n != null) {
n.oneway(command);
}
}
public X509Certificate[] getPeerCertificates() {
if (next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
if (trace && peerCerts != null) {
LOG.debug("Peer Identity has been verified\n");
}
return peerCerts;
}
return null;
}
public boolean isTrace() {
return trace;
}
public void setTrace(boolean trace) {
this.trace = trace;
}
@Override
public MQTTInactivityMonitor getInactivityMonitor() {
return monitor;
}
public void setInactivityMonitor(MQTTInactivityMonitor monitor) {
this.monitor = monitor;
}
@Override
public MQTTWireFormat getWireFormat() {
return this.wireFormat;
}
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.codec.MQTTFrame;
/**
* Implements marshalling and unmarsalling the <a
* href="http://mqtt.org/">MQTT</a> protocol.
*/
public class MQTTWireFormat implements WireFormat {
private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
private boolean encodingEnabled = false;
private int version = 1;
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
return baos.toByteSequence();
}
public Object unmarshal(ByteSequence packet) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
public void marshal(Object command, DataOutput dataOut) throws IOException {
MQTTFrame frame = (MQTTFrame) command;
dataOut.write(frame.header());
int remaining = 0;
for (Buffer buffer : frame.buffers) {
remaining += buffer.length;
}
do {
byte digit = (byte) (remaining & 0x7F);
remaining >>>= 7;
if (remaining > 0) {
digit |= 0x80;
}
dataOut.write(digit);
} while (remaining > 0);
for (Buffer buffer : frame.buffers) {
dataOut.write(buffer.data, buffer.offset, buffer.length);
}
}
public Object unmarshal(DataInput dataIn) throws IOException {
byte header = dataIn.readByte();
byte digit = 0;
int multiplier = 1;
int length = 0;
do {
digit = dataIn.readByte();
length += (digit & 0x7F) * multiplier;
multiplier <<= 7;
}
while ((digit & 0x80) != 0);
if (length >= 0) {
if (length > MAX_MESSAGE_LENGTH) {
throw new IOException("The maximum message length was exceeded");
}
if (length > 0) {
byte[] data = new byte[length];
dataIn.readFully(data);
Buffer body = new Buffer(data);
return new MQTTFrame(body).header(header);
} else {
return new MQTTFrame().header(header);
}
}
return null;
}
/**
* @param the version of the wire format
*/
public void setVersion(int version) {
this.version = version;
}
/**
* @return the version of the wire format
*/
public int getVersion() {
return this.version;
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class MQTTWireFormatFactory implements WireFormatFactory {
public WireFormat createWireFormat() {
return new MQTTWireFormat();
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import org.apache.activemq.command.Response;
/**
* Interface used by the MQTTProtocolConverter for callbacks.
*/
interface ResponseHandler {
void onResponse(MQTTProtocolConverter converter, Response response) throws IOException;
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
public class WildCardConvertor {
static String convertActiveMQToMQTT(String name) {
String result = name.replaceAll("#", ">");
result = result.replaceAll("+", "*");
result = result.replaceAll("/", ".");
return result;
}
static String convertMQTTToActiveMQ(String name) {
String result = name.replaceAll(">", "#");
result = result.replaceAll("*", "+");
result = result.replaceAll(".", "/");
return result;
}
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
An implementation of the MQTT 3.1 protocol - see http://mqtt.org/
</body>
</html>

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.MQTTTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.MQTTWireFormatFactory

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.Vector;
import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// https://issues.apache.org/jira/browse/AMQ-3393
public class MQTTConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
@Before
public void startBroker() throws Exception {
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testConnect() throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
brokerService.start();
MQTT mqtt = new MQTT();
mqtt.setHost("localhost",1883);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Thread.sleep(1000);
connection.disconnect();
}
}

File diff suppressed because it is too large Load Diff