added NIO support to the MQTT protocol for https://issues.apache.org/jira/browse/AMQ-3786

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1324714 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-04-11 11:52:06 +00:00
parent ff5e7e7626
commit 4c3843809e
16 changed files with 805 additions and 175 deletions

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.fusesource.mqtt.codec.*;
public class MQTTCodec {
TcpTransport transport;
DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
boolean processedHeader = false;
String action;
byte header;
int contentLength = -1;
int previousByte = -1;
int payLoadRead = 0;
public MQTTCodec(TcpTransport transport) {
this.transport = transport;
}
public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
int i = 0;
byte b;
while (i++ < readSize) {
b = input.readByte();
// skip repeating nulls
if (!processedHeader && b == 0) {
previousByte = 0;
continue;
}
if (!processedHeader) {
i += processHeader(b, input);
if (contentLength == 0) {
processCommand();
}
} else {
if (contentLength == -1) {
// end of command reached, unmarshal
if (b == 0) {
processCommand();
} else {
currentCommand.write(b);
}
} else {
// read desired content length
if (payLoadRead == contentLength) {
processCommand();
i += processHeader(b, input);
} else {
currentCommand.write(b);
payLoadRead++;
}
}
}
previousByte = b;
}
if (processedHeader && payLoadRead == contentLength) {
processCommand();
}
}
/**
* sets the content length
*
* @return number of bytes read
*/
private int processHeader(byte header, DataByteArrayInputStream input) {
this.header = header;
byte digit;
int multiplier = 1;
int read = 0;
int length = 0;
do {
digit = input.readByte();
length += (digit & 0x7F) * multiplier;
multiplier <<= 7;
read++;
} while ((digit & 0x80) != 0);
contentLength = length;
processedHeader = true;
return read;
}
private void processCommand() throws Exception {
MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
transport.doConsume(frame);
processedHeader = false;
currentCommand.reset();
contentLength = -1;
payLoadRead = 0;
}
public static String commandType(byte header) throws IOException, JMSException {
byte messageType = (byte) ((header & 0xF0) >>> 4);
switch (messageType) {
case PINGREQ.TYPE: {
return "PINGREQ";
}
case CONNECT.TYPE: {
return "CONNECT";
}
case DISCONNECT.TYPE: {
return "DISCONNECT";
}
case SUBSCRIBE.TYPE: {
return "SUBSCRIBE";
}
case UNSUBSCRIBE.TYPE: {
return "UNSUBSCRIBE";
}
case PUBLISH.TYPE: {
return "PUBLISH";
}
case PUBACK.TYPE: {
return "PUBACK";
}
case PUBREC.TYPE: {
return "PUBREC";
}
case PUBREL.TYPE: {
return "PUBREL";
}
case PUBCOMP.TYPE: {
return "PUBCOMP";
}
default:
return "UNKNOWN";
}
}
}

View File

@ -62,6 +62,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean keepAliveResponseRequired;
private MQTTProtocolConverter protocolConverter;
private final Runnable readChecker = new Runnable() {
@ -125,6 +126,9 @@ public class MQTTInactivityMonitor extends TransportFilter {
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
if (protocolConverter != null) {
protocolConverter.onTransportError();
}
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
}
@ -225,6 +229,14 @@ public class MQTTInactivityMonitor extends TransportFilter {
return this.monitorStarted.get();
}
public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
}
public MQTTProtocolConverter getProtocolConverter() {
return protocolConverter;
}
synchronized void startMonitorThread() {
if (monitorStarted.get()) {
return;

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
public class MQTTNIOSSLTransport extends NIOSSLTransport {
MQTTCodec codec;
public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
}
@Override
protected void initializeStreams() throws IOException {
codec = new MQTTCodec(this);
super.initializeStreams();
if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
serviceRead();
}
}
@Override
protected void processCommand(ByteBuffer plain) throws Exception {
byte[] fill = new byte[plain.remaining()];
plain.get(fill);
DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
codec.parse(dis, fill.length);
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
SSLContext context;
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
if (context != null) {
transport.setSslContext(context);
}
return transport;
}
};
}
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new MQTTNIOSSLTransport(wf, socketFactory, location, localLocation);
}
@Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {
try {
context = SslContext.getCurrentSslContext().getSSLContext();
} catch (Exception e) {
throw new IOException(e);
}
}
return super.doBind(location);
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
*/
public class MQTTNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
private ByteBuffer inputBuffer;
MQTTCodec codec;
public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
}
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
channel.configureBlocking(false);
// listen for events telling us when the socket is readable.
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
public void onSelect(SelectorSelection selection) {
if (!isStopped()) {
serviceRead();
}
}
public void onError(SelectorSelection selection, Throwable error) {
if (error instanceof IOException) {
onException((IOException) error);
} else {
onException(IOExceptionSupport.create(error));
}
}
});
inputBuffer = ByteBuffer.allocate(8 * 1024);
NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
codec = new MQTTCodec(this);
}
private void serviceRead() {
try {
while (isStarted()) {
// read channel
int readSize = channel.read(inputBuffer);
// channel is closed, cleanup
if (readSize == -1) {
onException(new EOFException());
selection.close();
break;
}
// nothing more to read, break
if (readSize == 0) {
break;
}
inputBuffer.flip();
DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
codec.parse(dis, readSize);
// clear the buffer
inputBuffer.clear();
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
protected void doStart() throws Exception {
connect();
selection.setInterestOps(SelectionKey.OP_READ);
selection.enable();
}
protected void doStop(ServiceStopper stopper) throws Exception {
try {
if (selection != null) {
selection.close();
}
} finally {
super.doStop(stopper);
}
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://mqtt.org/">MQTT</a> over NIO transport factory
*/
public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
return "mqtt";
}
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new MQTTNIOTransport(format, socket);
}
};
}
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -28,7 +28,6 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.stomp.ProtocolException;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@ -56,16 +55,14 @@ class MQTTProtocolConverter {
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
private final MQTTTransport mqttTransport;
private final Object commnadIdMutex = new Object();
@ -143,6 +140,18 @@ class MQTTProtocolConverter {
onMQTTPubAck(new PUBACK().decode(frame));
break;
}
case PUBREC.TYPE: {
onMQTTPubRec(new PUBREC().decode(frame));
break;
}
case PUBREL.TYPE: {
onMQTTPubRel(new PUBREL().decode(frame));
break;
}
case PUBCOMP.TYPE: {
onMQTTPubComp(new PUBCOMP().decode(frame));
break;
}
default:
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
}
@ -150,10 +159,10 @@ class MQTTProtocolConverter {
}
void onMQTTConnect(final CONNECT connect) throws ProtocolException {
void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
if (connected.get()) {
throw new ProtocolException("All ready connected.");
throw new MQTTProtocolException("All ready connected.");
}
this.connect = connect;
@ -268,7 +277,7 @@ class MQTTProtocolConverter {
consumerInfo.setSubscriptionName(connect.clientId().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo);
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
@ -327,13 +336,16 @@ class MQTTProtocolConverter {
MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
MessageAck ack = sub.createMessageAck(md);
PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage());
if (ack != null) {
PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
if (ack != null && sub.expectAck()) {
synchronized (consumerAcks) {
consumerAcks.put(publish.messageId(), ack);
}
}
getMQTTTransport().sendToMQTT(publish.encode());
if (ack != null && !sub.expectAck()) {
getMQTTTransport().sendToActiveMQ(ack);
}
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
@ -356,7 +368,38 @@ class MQTTProtocolConverter {
void onMQTTPubAck(PUBACK command) {
short messageId = command.messageId();
MessageAck ack = null;
MessageAck ack;
synchronized (consumerAcks) {
ack = consumerAcks.remove(messageId);
}
if (ack != null) {
getMQTTTransport().sendToActiveMQ(ack);
}
}
void onMQTTPubRec(PUBREC commnand) {
//from a subscriber - send a PUBREL in response
PUBREL pubrel = new PUBREL();
pubrel.messageId(commnand.messageId());
sendToMQTT(pubrel.encode());
}
void onMQTTPubRel(PUBREL command) {
PUBREC ack;
synchronized (publisherRecs) {
ack = publisherRecs.remove(command.messageId());
}
if (ack == null) {
LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
}
PUBCOMP pubcomp = new PUBCOMP();
pubcomp.messageId(command.messageId());
sendToMQTT(pubcomp.encode());
}
void onMQTTPubComp(PUBCOMP command) {
short messageId = command.messageId();
MessageAck ack;
synchronized (consumerAcks) {
ack = consumerAcks.remove(messageId);
}
@ -461,19 +504,24 @@ class MQTTProtocolConverter {
return mqttTransport;
}
public ActiveMQDestination createTempDestination(String name, boolean topic) {
ActiveMQDestination rc = tempDestinations.get(name);
if (rc == null) {
if (topic) {
rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
} else {
rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
public void onTransportError() {
if (connect != null) {
if (connect.willTopic() != null && connect.willMessage() != null) {
try {
PUBLISH publish = new PUBLISH();
publish.topicName(connect.willTopic());
publish.qos(connect.willQos());
publish.payload(connect.willMessage());
ActiveMQMessage message = convertMessage(publish);
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, null);
} catch (Exception e) {
LOG.warn("Failed to publish Will Message " + connect.willMessage());
}
}
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
return rc;
}
@ -482,7 +530,7 @@ class MQTTProtocolConverter {
int heartBeatMS = heartBeat * 1000;
MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
monitor.setProtocolConverter(this);
monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS);
monitor.startMonitorThread();
@ -555,8 +603,11 @@ class MQTTProtocolConverter {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
} else {
PUBACK ack = new PUBACK();
PUBREC ack = new PUBREC();
ack.messageId(command.messageId());
synchronized (publisherRecs) {
publisherRecs.put(command.messageId(), ack);
}
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
@ -565,27 +616,6 @@ class MQTTProtocolConverter {
break;
}
}
/*
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null) {
return new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
// We report back the failure but we don't close the connection.
Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, command);
} else {
StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.RECEIPT);
sc.setHeaders(new HashMap<String, String>(1));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
stompTransport.sendToStomp(sc);
}
}
};
}
*/
return null;
}

View File

@ -16,11 +16,17 @@
*/
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.zip.DataFormatException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.codec.PUBLISH;
/**
* Keeps track of the MQTT client subscription so that acking is correctly done.
@ -39,16 +45,20 @@ class MQTTSubscription {
}
MessageAck createMessageAck(MessageDispatch md) {
switch (qos) {
case AT_MOST_ONCE: {
return null;
}
}
return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
}
PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
PUBLISH publish = protocolConverter.convertMessage(message);
if (publish.qos().ordinal() > this.qos.ordinal()) {
publish.qos(this.qos);
}
return publish;
}
public boolean expectAck() {
return qos != QoS.AT_MOST_ONCE;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;

View File

@ -33,8 +33,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The StompTransportFilter normally sits on top of a TcpTransport that has been
* configured with the StompWireFormat and is used to convert STOMP commands to
* The MQTTTransportFilter normally sits on top of a TcpTransport that has been
* configured with the StompWireFormat and is used to convert MQTT commands to
* ActiveMQ commands. All of the conversion work is done by delegating to the
* MQTTProtocolConverter
*/
@ -73,7 +73,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
protocolConverter.onMQTTCommand((MQTTFrame) command);
} catch (IOException e) {
onException(e);
handleException(e);
} catch (JMSException e) {
onException(IOExceptionSupport.create(e));
}
@ -129,5 +129,10 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
return this.wireFormat;
}
public void handleException(IOException e) {
protocolConverter.onTransportError();
super.onException(e);
}
}

View File

@ -36,7 +36,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
public class MQTTWireFormat implements WireFormat {
private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
private boolean encodingEnabled = false;
private int version = 1;
@ -79,8 +79,7 @@ public class MQTTWireFormat implements WireFormat {
public Object unmarshal(DataInput dataIn) throws IOException {
byte header = dataIn.readByte();
byte digit = 0;
byte digit;
int multiplier = 1;
int length = 0;
do {
@ -89,6 +88,7 @@ public class MQTTWireFormat implements WireFormat {
multiplier <<= 7;
}
while ((digit & 0x80) != 0);
if (length >= 0) {
if (length > MAX_MESSAGE_LENGTH) {
throw new IOException("The maximum message length was exceeded");

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.MQTTNIOTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.mqtt.MQTTNIOSSLTransportFactory

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.Vector;
import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
@Before
public void startBroker() throws Exception {
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testConnect() throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
brokerService.start();
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Thread.sleep(1000);
connection.disconnect();
}
}

View File

@ -19,28 +19,15 @@ package org.apache.activemq.transport.mqtt;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Vector;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTSSLConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
@Before
public class MQTTSSLTest extends MQTTTest {
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
@ -48,39 +35,23 @@ public class MQTTSSLConnectTest {
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
super.startBroker();
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testConnect() throws Exception {
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt+ssl://localhost:8883");
brokerService.start();
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("ssl://localhost:8883");
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(ctx);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Thread.sleep(1000);
connection.disconnect();
return mqtt;
}
private static class DefaultTrustManager implements X509TrustManager {
static class DefaultTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@ -92,4 +63,5 @@ public class MQTTSSLConnectTest {
return new X509Certificate[0];
}
}
}

View File

@ -36,13 +36,15 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class MQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages;
@Before
public void startBroker() throws Exception {
@ -50,6 +52,7 @@ public class MQTTTest {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
this.numberOfMessages = 2000;
}
@After
@ -60,19 +63,40 @@ public class MQTTTest {
}
@Test
public void testSendAndReceiveAtLeastOnce() throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector(brokerService);
brokerService.start();
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
Message message = connection.receive();
assertEquals(payload, new String(message.getPayload()));
}
connection.disconnect();
}
@Test
public void testSendAndReceiveAtLeastOnce() throws Exception {
addMQTTConnector(brokerService);
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < 10000; i++) {
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Message message = connection.receive();
@ -83,13 +107,70 @@ public class MQTTTest {
}
@Test
public void testSendMQTTReceiveJMS() throws Exception {
public void testSendAndReceiveExactlyOnce() throws Exception {
addMQTTConnector(brokerService);
brokerService.start();
MQTT publisher = createMQTTConnection();
BlockingConnection pubConnection = publisher.blockingConnection();
brokerService.addConnector("mqtt://localhost:1883");
pubConnection.connect();
MQTT subscriber = createMQTTConnection();
BlockingConnection subConnection = subscriber.blockingConnection();
subConnection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
subConnection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
Message message = subConnection.receive();
message.ack();
assertEquals(payload, new String(message.getPayload()));
}
subConnection.disconnect();
pubConnection.disconnect();
}
@Test
public void testSendAndReceiveLargeMessages() throws Exception {
byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
addMQTTConnector(brokerService);
brokerService.start();
MQTT publisher = createMQTTConnection();
BlockingConnection pubConnection = publisher.blockingConnection();
pubConnection.connect();
MQTT subscriber = createMQTTConnection();
BlockingConnection subConnection = subscriber.blockingConnection();
subConnection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
subConnection.subscribe(topics);
for (int i = 0; i < 10; i++) {
pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
Message message = subConnection.receive();
message.ack();
assertArrayEquals(payload, message.getPayload());
}
subConnection.disconnect();
pubConnection.disconnect();
}
@Test
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector(brokerService);
brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
brokerService.start();
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection();
final String DESTINATION_NAME = "foo";
connection.connect();
@ -100,7 +181,7 @@ public class MQTTTest {
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
MessageConsumer consumer = s.createConsumer(jmsTopic);
for (int i = 0; i < 10000; i++) {
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
@ -113,5 +194,15 @@ public class MQTTTest {
connection.disconnect();
}
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
return mqtt;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.broker.BrokerService;
public class MTQQNioTest extends MQTTTest {
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt+nio://localhost:1883?maxInactivityDuration=-1");
}
}