From 4c3843809eecd4948919e0c1690b283e0a69748b Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 11 Apr 2012 11:52:06 +0000 Subject: [PATCH] added NIO support to the MQTT protocol for https://issues.apache.org/jira/browse/AMQ-3786 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1324714 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/mqtt/MQTTCodec.java | 161 ++++++++++++++++++ .../transport/mqtt/MQTTInactivityMonitor.java | 12 ++ .../transport/mqtt/MQTTNIOSSLTransport.java | 59 +++++++ .../mqtt/MQTTNIOSSLTransportFactory.java | 69 ++++++++ .../transport/mqtt/MQTTNIOTransport.java | 132 ++++++++++++++ .../mqtt/MQTTNIOTransportFactory.java | 95 +++++++++++ .../transport/mqtt/MQTTProtocolConverter.java | 118 ++++++++----- .../transport/mqtt/MQTTSubscription.java | 24 ++- .../transport/mqtt/MQTTTransportFilter.java | 11 +- .../transport/mqtt/MQTTWireFormat.java | 6 +- .../org/apache/activemq/transport/mqtt+nio | 17 ++ .../apache/activemq/transport/mqtt+nio+ssl | 17 ++ .../transport/mqtt/MQTTConnectTest.java | 66 ------- ...TTSSLConnectTest.java => MQTTSSLTest.java} | 48 ++---- .../activemq/transport/mqtt/MQTTTest.java | 119 +++++++++++-- .../activemq/transport/mqtt/MTQQNioTest.java | 26 +++ 16 files changed, 805 insertions(+), 175 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl delete mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java rename activemq-core/src/test/java/org/apache/activemq/transport/mqtt/{MQTTSSLConnectTest.java => MQTTSSLTest.java} (69%) create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java new file mode 100644 index 0000000000..f3e16aa279 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; + +import javax.jms.JMSException; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.mqtt.codec.*; + +public class MQTTCodec { + + TcpTransport transport; + + DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream(); + boolean processedHeader = false; + String action; + byte header; + int contentLength = -1; + int previousByte = -1; + int payLoadRead = 0; + + public MQTTCodec(TcpTransport transport) { + this.transport = transport; + } + + public void parse(DataByteArrayInputStream input, int readSize) throws Exception { + int i = 0; + byte b; + while (i++ < readSize) { + b = input.readByte(); + // skip repeating nulls + if (!processedHeader && b == 0) { + previousByte = 0; + continue; + } + + if (!processedHeader) { + i += processHeader(b, input); + if (contentLength == 0) { + processCommand(); + } + + } else { + + if (contentLength == -1) { + // end of command reached, unmarshal + if (b == 0) { + processCommand(); + } else { + currentCommand.write(b); + } + } else { + // read desired content length + if (payLoadRead == contentLength) { + processCommand(); + i += processHeader(b, input); + } else { + currentCommand.write(b); + payLoadRead++; + } + } + } + + previousByte = b; + } + if (processedHeader && payLoadRead == contentLength) { + processCommand(); + } + } + + /** + * sets the content length + * + * @return number of bytes read + */ + private int processHeader(byte header, DataByteArrayInputStream input) { + this.header = header; + byte digit; + int multiplier = 1; + int read = 0; + int length = 0; + do { + digit = input.readByte(); + length += (digit & 0x7F) * multiplier; + multiplier <<= 7; + read++; + } while ((digit & 0x80) != 0); + + contentLength = length; + processedHeader = true; + return read; + } + + + private void processCommand() throws Exception { + MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header); + transport.doConsume(frame); + processedHeader = false; + currentCommand.reset(); + contentLength = -1; + payLoadRead = 0; + } + + public static String commandType(byte header) throws IOException, JMSException { + + byte messageType = (byte) ((header & 0xF0) >>> 4); + switch (messageType) { + case PINGREQ.TYPE: { + return "PINGREQ"; + } + case CONNECT.TYPE: { + return "CONNECT"; + } + case DISCONNECT.TYPE: { + return "DISCONNECT"; + } + case SUBSCRIBE.TYPE: { + return "SUBSCRIBE"; + } + case UNSUBSCRIBE.TYPE: { + return "UNSUBSCRIBE"; + } + case PUBLISH.TYPE: { + return "PUBLISH"; + } + case PUBACK.TYPE: { + return "PUBACK"; + } + case PUBREC.TYPE: { + return "PUBREC"; + } + case PUBREL.TYPE: { + return "PUBREL"; + } + case PUBCOMP.TYPE: { + return "PUBCOMP"; + } + default: + return "UNKNOWN"; + } + + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index 327568b48a..4f11ec5984 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -62,6 +62,7 @@ public class MQTTInactivityMonitor extends TransportFilter { private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; private boolean keepAliveResponseRequired; + private MQTTProtocolConverter protocolConverter; private final Runnable readChecker = new Runnable() { @@ -125,6 +126,9 @@ public class MQTTInactivityMonitor extends TransportFilter { } ASYNC_TASKS.execute(new Runnable() { public void run() { + if (protocolConverter != null) { + protocolConverter.onTransportError(); + } onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); } @@ -225,6 +229,14 @@ public class MQTTInactivityMonitor extends TransportFilter { return this.monitorStarted.get(); } + public void setProtocolConverter(MQTTProtocolConverter protocolConverter) { + this.protocolConverter = protocolConverter; + } + + public MQTTProtocolConverter getProtocolConverter() { + return protocolConverter; + } + synchronized void startMonitorThread() { if (monitorStarted.get()) { return; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java new file mode 100644 index 0000000000..ef18fe0740 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; + +import javax.net.SocketFactory; +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.DataByteArrayInputStream; + +public class MQTTNIOSSLTransport extends NIOSSLTransport { + + MQTTCodec codec; + + public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException { + super(wireFormat, socket); + } + + @Override + protected void initializeStreams() throws IOException { + codec = new MQTTCodec(this); + super.initializeStreams(); + if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) { + serviceRead(); + } + } + + @Override + protected void processCommand(ByteBuffer plain) throws Exception { + byte[] fill = new byte[plain.remaining()]; + plain.get(fill); + DataByteArrayInputStream dis = new DataByteArrayInputStream(fill); + codec.parse(dis, fill.length); + } + +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java new file mode 100644 index 0000000000..b9dfaba51b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; + +public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory { + + SSLContext context; + + @Override + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory) { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket); + if (context != null) { + transport.setSslContext(context); + } + return transport; + } + }; + } + + @Override + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new MQTTNIOSSLTransport(wf, socketFactory, location, localLocation); + } + + @Override + public TransportServer doBind(URI location) throws IOException { + if (SslContext.getCurrentSslContext() != null) { + try { + context = SslContext.getCurrentSslContext().getSSLContext(); + } catch (Exception e) { + throw new IOException(e); + } + } + return super.doBind(location); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java new file mode 100644 index 0000000000..a85f5467d5 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import javax.net.SocketFactory; +import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.DataByteArrayInputStream; + +/** + * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO + */ +public class MQTTNIOTransport extends TcpTransport { + + private SocketChannel channel; + private SelectorSelection selection; + + private ByteBuffer inputBuffer; + MQTTCodec codec; + + public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException { + super(wireFormat, socket); + } + + protected void initializeStreams() throws IOException { + channel = socket.getChannel(); + channel.configureBlocking(false); + // listen for events telling us when the socket is readable. + selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { + public void onSelect(SelectorSelection selection) { + if (!isStopped()) { + serviceRead(); + } + } + + public void onError(SelectorSelection selection, Throwable error) { + if (error instanceof IOException) { + onException((IOException) error); + } else { + onException(IOExceptionSupport.create(error)); + } + } + }); + + inputBuffer = ByteBuffer.allocate(8 * 1024); + NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024); + this.dataOut = new DataOutputStream(outPutStream); + this.buffOut = outPutStream; + codec = new MQTTCodec(this); + } + + private void serviceRead() { + try { + + while (isStarted()) { + // read channel + int readSize = channel.read(inputBuffer); + // channel is closed, cleanup + if (readSize == -1) { + onException(new EOFException()); + selection.close(); + break; + } + // nothing more to read, break + if (readSize == 0) { + break; + } + + inputBuffer.flip(); + DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array()); + codec.parse(dis, readSize); + + // clear the buffer + inputBuffer.clear(); + + } + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } + } + + protected void doStart() throws Exception { + connect(); + selection.setInterestOps(SelectionKey.OP_READ); + selection.enable(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + try { + if (selection != null) { + selection.close(); + } + } finally { + super.doStop(stopper); + } + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java new file mode 100644 index 0000000000..f18e900387 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.nio.NIOTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * A MQTT over NIO transport factory + */ +public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware { + + private BrokerContext brokerContext = null; + + protected String getDefaultWireFormatType() { + return "mqtt"; + } + + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory) { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + return new MQTTNIOTransport(format, socket); + } + }; + } + + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new MQTTNIOTransport(wf, socketFactory, location, localLocation); + } + + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + + @SuppressWarnings("rawtypes") + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new MQTTTransportFilter(transport, format, brokerContext); + IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); + } + + public void setBrokerService(BrokerService brokerService) { + this.brokerContext = brokerService.getBrokerContext(); + } + + protected Transport createInactivityMonitor(Transport transport, WireFormat format) { + MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format); + MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class); + filter.setInactivityMonitor(monitor); + return monitor; + } + +} + diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 245bd41947..41c3ea492a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -28,7 +28,6 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.command.*; -import org.apache.activemq.transport.stomp.ProtocolException; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; @@ -56,16 +55,14 @@ class MQTTProtocolConverter { private final ProducerId producerId = new ProducerId(sessionId, 1); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); - private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); - private final ConcurrentHashMap tempDestinations = new ConcurrentHashMap(); - private final ConcurrentHashMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); private final Map activeMQTopicMap = new LRUCache(); private final Map mqttTopicMap = new LRUCache(); private final Map consumerAcks = new LRUCache(); + private final Map publisherRecs = new LRUCache(); private final MQTTTransport mqttTransport; private final Object commnadIdMutex = new Object(); @@ -143,6 +140,18 @@ class MQTTProtocolConverter { onMQTTPubAck(new PUBACK().decode(frame)); break; } + case PUBREC.TYPE: { + onMQTTPubRec(new PUBREC().decode(frame)); + break; + } + case PUBREL.TYPE: { + onMQTTPubRel(new PUBREL().decode(frame)); + break; + } + case PUBCOMP.TYPE: { + onMQTTPubComp(new PUBCOMP().decode(frame)); + break; + } default: handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); } @@ -150,10 +159,10 @@ class MQTTProtocolConverter { } - void onMQTTConnect(final CONNECT connect) throws ProtocolException { + void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { if (connected.get()) { - throw new ProtocolException("All ready connected."); + throw new MQTTProtocolException("All ready connected."); } this.connect = connect; @@ -268,7 +277,7 @@ class MQTTProtocolConverter { consumerInfo.setSubscriptionName(connect.clientId().toString()); } - MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo); + MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); subscriptionsByConsumerId.put(id, mqttSubscription); @@ -327,13 +336,16 @@ class MQTTProtocolConverter { MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); if (sub != null) { MessageAck ack = sub.createMessageAck(md); - PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage()); - if (ack != null) { + PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); + if (ack != null && sub.expectAck()) { synchronized (consumerAcks) { consumerAcks.put(publish.messageId(), ack); } } getMQTTTransport().sendToMQTT(publish.encode()); + if (ack != null && !sub.expectAck()) { + getMQTTTransport().sendToActiveMQ(ack); + } } } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { // Pass down any unexpected async errors. Should this close the connection? @@ -356,7 +368,38 @@ class MQTTProtocolConverter { void onMQTTPubAck(PUBACK command) { short messageId = command.messageId(); - MessageAck ack = null; + MessageAck ack; + synchronized (consumerAcks) { + ack = consumerAcks.remove(messageId); + } + if (ack != null) { + getMQTTTransport().sendToActiveMQ(ack); + } + } + + void onMQTTPubRec(PUBREC commnand) { + //from a subscriber - send a PUBREL in response + PUBREL pubrel = new PUBREL(); + pubrel.messageId(commnand.messageId()); + sendToMQTT(pubrel.encode()); + } + + void onMQTTPubRel(PUBREL command) { + PUBREC ack; + synchronized (publisherRecs) { + ack = publisherRecs.remove(command.messageId()); + } + if (ack == null) { + LOG.warn("Unknown PUBREL: " + command.messageId() + " received"); + } + PUBCOMP pubcomp = new PUBCOMP(); + pubcomp.messageId(command.messageId()); + sendToMQTT(pubcomp.encode()); + } + + void onMQTTPubComp(PUBCOMP command) { + short messageId = command.messageId(); + MessageAck ack; synchronized (consumerAcks) { ack = consumerAcks.remove(messageId); } @@ -461,19 +504,24 @@ class MQTTProtocolConverter { return mqttTransport; } - public ActiveMQDestination createTempDestination(String name, boolean topic) { - ActiveMQDestination rc = tempDestinations.get(name); - if (rc == null) { - if (topic) { - rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); - } else { - rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); + public void onTransportError() { + if (connect != null) { + if (connect.willTopic() != null && connect.willMessage() != null) { + try { + PUBLISH publish = new PUBLISH(); + publish.topicName(connect.willTopic()); + publish.qos(connect.willQos()); + publish.payload(connect.willMessage()); + ActiveMQMessage message = convertMessage(publish); + message.setProducerId(producerId); + message.onSend(); + sendToActiveMQ(message, null); + } catch (Exception e) { + LOG.warn("Failed to publish Will Message " + connect.willMessage()); + } + } - sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); - tempDestinations.put(name, rc); - tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); } - return rc; } @@ -482,7 +530,7 @@ class MQTTProtocolConverter { int heartBeatMS = heartBeat * 1000; MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); - + monitor.setProtocolConverter(this); monitor.setReadCheckTime(heartBeatMS); monitor.setInitialDelayTime(heartBeatMS); monitor.startMonitorThread(); @@ -555,8 +603,11 @@ class MQTTProtocolConverter { if (response.isException()) { LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); } else { - PUBACK ack = new PUBACK(); + PUBREC ack = new PUBREC(); ack.messageId(command.messageId()); + synchronized (publisherRecs) { + publisherRecs.put(command.messageId(), ack); + } converter.getMQTTTransport().sendToMQTT(ack.encode()); } } @@ -565,27 +616,6 @@ class MQTTProtocolConverter { break; } } - /* - final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); - if (receiptId != null) { - return new ResponseHandler() { - public void onResponse(ProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { - // Generally a command can fail.. but that does not invalidate the connection. - // We report back the failure but we don't close the connection. - Throwable exception = ((ExceptionResponse)response).getException(); - handleException(exception, command); - } else { - StompFrame sc = new StompFrame(); - sc.setAction(Stomp.Responses.RECEIPT); - sc.setHeaders(new HashMap(1)); - sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); - stompTransport.sendToStomp(sc); - } - } - }; - } - */ return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java index 16105268c9..1636b4273d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.transport.mqtt; +import java.io.IOException; +import java.util.zip.DataFormatException; + +import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.codec.PUBLISH; /** * Keeps track of the MQTT client subscription so that acking is correctly done. @@ -39,16 +45,20 @@ class MQTTSubscription { } MessageAck createMessageAck(MessageDispatch md) { - - switch (qos) { - case AT_MOST_ONCE: { - return null; - } - - } return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); } + PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException { + PUBLISH publish = protocolConverter.convertMessage(message); + if (publish.qos().ordinal() > this.qos.ordinal()) { + publish.qos(this.qos); + } + return publish; + } + + public boolean expectAck() { + return qos != QoS.AT_MOST_ONCE; + } public void setDestination(ActiveMQDestination destination) { this.destination = destination; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index fe0aa92f35..f56bfb3235 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -33,8 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The StompTransportFilter normally sits on top of a TcpTransport that has been - * configured with the StompWireFormat and is used to convert STOMP commands to + * The MQTTTransportFilter normally sits on top of a TcpTransport that has been + * configured with the StompWireFormat and is used to convert MQTT commands to * ActiveMQ commands. All of the conversion work is done by delegating to the * MQTTProtocolConverter */ @@ -73,7 +73,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor protocolConverter.onMQTTCommand((MQTTFrame) command); } catch (IOException e) { - onException(e); + handleException(e); } catch (JMSException e) { onException(IOExceptionSupport.create(e)); } @@ -129,5 +129,10 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor return this.wireFormat; } + public void handleException(IOException e) { + protocolConverter.onTransportError(); + super.onException(e); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java index b446933034..0b3c55b950 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java @@ -36,7 +36,7 @@ import org.fusesource.mqtt.codec.MQTTFrame; public class MQTTWireFormat implements WireFormat { - private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256; + static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256; private boolean encodingEnabled = false; private int version = 1; @@ -79,8 +79,7 @@ public class MQTTWireFormat implements WireFormat { public Object unmarshal(DataInput dataIn) throws IOException { byte header = dataIn.readByte(); - byte digit = 0; - + byte digit; int multiplier = 1; int length = 0; do { @@ -89,6 +88,7 @@ public class MQTTWireFormat implements WireFormat { multiplier <<= 7; } while ((digit & 0x80) != 0); + if (length >= 0) { if (length > MAX_MESSAGE_LENGTH) { throw new IOException("The maximum message length was exceeded"); diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio new file mode 100644 index 0000000000..d957bd5a5f --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.mqtt.MQTTNIOTransportFactory diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl new file mode 100644 index 0000000000..e814d7bd02 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.mqtt.MQTTNIOSSLTransportFactory \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java deleted file mode 100644 index 9578bc3a9e..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import java.util.Vector; - -import org.apache.activemq.broker.BrokerService; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class MQTTConnectTest { - private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); - BrokerService brokerService; - Vector exceptions = new Vector(); - - @Before - public void startBroker() throws Exception { - exceptions.clear(); - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setAdvisorySupport(false); - } - - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - @Test - public void testConnect() throws Exception { - - brokerService.addConnector("mqtt://localhost:1883"); - brokerService.start(); - MQTT mqtt = new MQTT(); - mqtt.setHost("localhost", 1883); - BlockingConnection connection = mqtt.blockingConnection(); - - connection.connect(); - Thread.sleep(1000); - connection.disconnect(); - } - - -} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java similarity index 69% rename from activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java rename to activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java index 9c79ffeccc..79265d0524 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java @@ -19,28 +19,15 @@ package org.apache.activemq.transport.mqtt; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; -import java.util.Vector; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.apache.activemq.broker.BrokerService; -import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MQTTSSLConnectTest { - private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLConnectTest.class); - BrokerService brokerService; - Vector exceptions = new Vector(); - - @Before +public class MQTTSSLTest extends MQTTTest { public void startBroker() throws Exception { System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); System.setProperty("javax.net.ssl.trustStorePassword", "password"); @@ -48,39 +35,23 @@ public class MQTTSSLConnectTest { System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); System.setProperty("javax.net.ssl.keyStorePassword", "password"); System.setProperty("javax.net.ssl.keyStoreType", "jks"); - - exceptions.clear(); - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setAdvisorySupport(false); + super.startBroker(); } - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - @Test - public void testConnect() throws Exception { - + protected void addMQTTConnector(BrokerService brokerService) throws Exception { brokerService.addConnector("mqtt+ssl://localhost:8883"); - brokerService.start(); + } + + protected MQTT createMQTTConnection() throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost("ssl://localhost:8883"); SSLContext ctx = SSLContext.getInstance("TLS"); ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); mqtt.setSslContext(ctx); - BlockingConnection connection = mqtt.blockingConnection(); - - connection.connect(); - Thread.sleep(1000); - connection.disconnect(); + return mqtt; } - - private static class DefaultTrustManager implements X509TrustManager { + static class DefaultTrustManager implements X509TrustManager { public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { } @@ -92,4 +63,5 @@ public class MQTTSSLConnectTest { return new X509Certificate[0]; } } -} \ No newline at end of file + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index d1b9fb9728..ca4f183171 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -36,13 +36,15 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class MQTTTest { - private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); - BrokerService brokerService; - Vector exceptions = new Vector(); + protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); + protected BrokerService brokerService; + protected Vector exceptions = new Vector(); + protected int numberOfMessages; @Before public void startBroker() throws Exception { @@ -50,6 +52,7 @@ public class MQTTTest { brokerService = new BrokerService(); brokerService.setPersistent(false); brokerService.setAdvisorySupport(false); + this.numberOfMessages = 2000; } @After @@ -60,19 +63,40 @@ public class MQTTTest { } @Test - public void testSendAndReceiveAtLeastOnce() throws Exception { - - brokerService.addConnector("mqtt://localhost:1883"); + public void testSendAndReceiveAtMostOnce() throws Exception { + addMQTTConnector(brokerService); brokerService.start(); - MQTT mqtt = new MQTT(); - mqtt.setHost("localhost", 1883); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive(Short.MAX_VALUE); + BlockingConnection connection = mqtt.blockingConnection(); + + connection.connect(); + + + Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)}; + connection.subscribe(topics); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false); + Message message = connection.receive(); + assertEquals(payload, new String(message.getPayload())); + } + connection.disconnect(); + } + + @Test + public void testSendAndReceiveAtLeastOnce() throws Exception { + addMQTTConnector(brokerService); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive(Short.MAX_VALUE); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; connection.subscribe(topics); - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Message message = connection.receive(); @@ -83,13 +107,70 @@ public class MQTTTest { } @Test - public void testSendMQTTReceiveJMS() throws Exception { + public void testSendAndReceiveExactlyOnce() throws Exception { + addMQTTConnector(brokerService); + brokerService.start(); + MQTT publisher = createMQTTConnection(); + BlockingConnection pubConnection = publisher.blockingConnection(); - brokerService.addConnector("mqtt://localhost:1883"); + pubConnection.connect(); + + MQTT subscriber = createMQTTConnection(); + BlockingConnection subConnection = subscriber.blockingConnection(); + + subConnection.connect(); + + Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)}; + subConnection.subscribe(topics); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false); + Message message = subConnection.receive(); + message.ack(); + assertEquals(payload, new String(message.getPayload())); + } + subConnection.disconnect(); + pubConnection.disconnect(); + } + + @Test + public void testSendAndReceiveLargeMessages() throws Exception { + byte[] payload = new byte[1024 * 32]; + for (int i = 0; i < payload.length; i++){ + payload[i] = '2'; + } + addMQTTConnector(brokerService); + brokerService.start(); + + MQTT publisher = createMQTTConnection(); + BlockingConnection pubConnection = publisher.blockingConnection(); + + pubConnection.connect(); + + MQTT subscriber = createMQTTConnection(); + BlockingConnection subConnection = subscriber.blockingConnection(); + + subConnection.connect(); + + Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; + subConnection.subscribe(topics); + for (int i = 0; i < 10; i++) { + pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false); + Message message = subConnection.receive(); + message.ack(); + assertArrayEquals(payload, message.getPayload()); + } + subConnection.disconnect(); + pubConnection.disconnect(); + } + + + @Test + public void testSendMQTTReceiveJMS() throws Exception { + addMQTTConnector(brokerService); brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); brokerService.start(); - MQTT mqtt = new MQTT(); - mqtt.setHost("localhost", 1883); + MQTT mqtt = createMQTTConnection(); BlockingConnection connection = mqtt.blockingConnection(); final String DESTINATION_NAME = "foo"; connection.connect(); @@ -100,7 +181,7 @@ public class MQTTTest { javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME); MessageConsumer consumer = s.createConsumer(jmsTopic); - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); @@ -113,5 +194,15 @@ public class MQTTTest { connection.disconnect(); } + protected void addMQTTConnector(BrokerService brokerService) throws Exception { + brokerService.addConnector("mqtt://localhost:1883"); + } + + protected MQTT createMQTTConnection() throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setHost("localhost", 1883); + return mqtt; + } + } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java new file mode 100644 index 0000000000..74cfeb28ad --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.broker.BrokerService; + +public class MTQQNioTest extends MQTTTest { + protected void addMQTTConnector(BrokerService brokerService) throws Exception { + brokerService.addConnector("mqtt+nio://localhost:1883?maxInactivityDuration=-1"); + } + +}