From cfc6917a79dd1c5ba3be70edf49a9a70393179ba Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Sun, 25 Mar 2012 06:33:49 +0000 Subject: [PATCH] intial files to support MQTT -see https://issues.apache.org/jira/browse/AMQ-3786 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1304984 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/pom.xml | 4 + .../transport/mqtt/MQTTInactivityMonitor.java | 294 +++ .../transport/mqtt/MQTTProtocolConverter.java | 417 ++++ .../transport/mqtt/MQTTProtocolException.java | 50 + .../transport/mqtt/MQTTSubscription.java | 94 + .../transport/mqtt/MQTTTransport.java | 43 + .../transport/mqtt/MQTTTransportFactory.java | 75 + .../transport/mqtt/MQTTTransportFilter.java | 135 ++ .../transport/mqtt/MQTTWireFormat.java | 124 + .../transport/mqtt/MQTTWireFormatFactory.java | 29 + .../transport/mqtt/ResponseHandler.java | 29 + .../transport/mqtt/WildCardConvertor.java | 34 + .../activemq/transport/mqtt/package.html | 25 + .../org/apache/activemq/transport/mqtt | 17 + .../org/apache/activemq/wireformat/mqtt | 17 + .../transport/mqtt/MQTTConnectTest.java | 66 + .../activemq/transport/mqtt/MQTTTest.java | 2002 +++++++++++++++++ 17 files changed, 3455 insertions(+) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 1fed2fba5b..a605744b4f 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -96,6 +96,10 @@ org.fusesource.fuse-extra fusemq-leveldb + + org.fusesource.mqtt-client + mqtt-client + diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java new file mode 100644 index 0000000000..1cfe382a33 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.util.Timer; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.transport.AbstractInactivityMonitor; +import org.apache.activemq.transport.InactivityIOException; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTInactivityMonitor extends TransportFilter { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class); + + private static ThreadPoolExecutor ASYNC_TASKS; + private static int CHECKER_COUNTER; + private static long DEFAULT_CHECK_TIME_MILLS = 30000; + private static Timer READ_CHECK_TIMER; + + private final AtomicBoolean monitorStarted = new AtomicBoolean(false); + + private final AtomicBoolean commandSent = new AtomicBoolean(false); + private final AtomicBoolean inSend = new AtomicBoolean(false); + private final AtomicBoolean failed = new AtomicBoolean(false); + + private final AtomicBoolean commandReceived = new AtomicBoolean(true); + private final AtomicBoolean inReceive = new AtomicBoolean(false); + private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); + + private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); + private SchedulerTimerTask readCheckerTask; + + private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; + private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; + private boolean useKeepAlive = true; + private boolean keepAliveResponseRequired; + + protected WireFormat wireFormat; + + private final Runnable readChecker = new Runnable() { + long lastRunTime; + + public void run() { + long now = System.currentTimeMillis(); + long elapsed = (now - lastRunTime); + + if (lastRunTime != 0 && LOG.isDebugEnabled()) { + LOG.debug("" + elapsed + " ms elapsed since last read check."); + } + + // Perhaps the timer executed a read check late.. and then executes + // the next read check on time which causes the time elapsed between + // read checks to be small.. + + // If less than 90% of the read check Time elapsed then abort this readcheck. + if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression. + LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); + return; + } + + lastRunTime = now; + readCheck(); + } + }; + + private boolean allowReadCheck(long elapsed) { + return elapsed > (readCheckTime * 9 / 10); + } + + + public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { + super(next); + this.wireFormat = wireFormat; + } + + public void start() throws Exception { + next.start(); + startMonitorThread(); + } + + public void stop() throws Exception { + stopMonitorThread(); + next.stop(); + } + + + final void readCheck() { + int currentCounter = next.getReceiveCounter(); + int previousCounter = lastReceiveCounter.getAndSet(currentCounter); + if (inReceive.get() || currentCounter != previousCounter) { + if (LOG.isTraceEnabled()) { + LOG.trace("A receive is in progress"); + } + return; + } + if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + } + ASYNC_TASKS.execute(new Runnable() { + public void run() { + onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); + } + + ; + }); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Message received since last read check, resetting flag: "); + } + } + commandReceived.set(false); + } + + + public void onCommand(Object command) { + commandReceived.set(true); + inReceive.set(true); + try { + if (command.getClass() == KeepAliveInfo.class) { + KeepAliveInfo info = (KeepAliveInfo) command; + if (info.isResponseRequired()) { + sendLock.readLock().lock(); + try { + info.setResponseRequired(false); + oneway(info); + } catch (IOException e) { + onException(e); + } finally { + sendLock.readLock().unlock(); + } + } + } else { + transportListener.onCommand(command); + } + } finally { + inReceive.set(false); + } + } + + public void oneway(Object o) throws IOException { + // To prevent the inactivity monitor from sending a message while we + // are performing a send we take a read lock. The inactivity monitor + // sends its Heart-beat commands under a write lock. This means that + // the MutexTransport is still responsible for synchronizing sends + this.sendLock.readLock().lock(); + inSend.set(true); + try { + doOnewaySend(o); + } finally { + commandSent.set(true); + inSend.set(false); + this.sendLock.readLock().unlock(); + } + } + + // Must be called under lock, either read or write on sendLock. + private void doOnewaySend(Object command) throws IOException { + if (failed.get()) { + throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); + } + next.oneway(command); + } + + public void onException(IOException error) { + if (failed.compareAndSet(false, true)) { + stopMonitorThread(); + transportListener.onException(error); + } + } + + public void setUseKeepAlive(boolean val) { + useKeepAlive = val; + } + + public long getReadCheckTime() { + return readCheckTime; + } + + public void setReadCheckTime(long readCheckTime) { + this.readCheckTime = readCheckTime; + } + + + public long getInitialDelayTime() { + return initialDelayTime; + } + + public void setInitialDelayTime(long initialDelayTime) { + this.initialDelayTime = initialDelayTime; + } + + public boolean isKeepAliveResponseRequired() { + return this.keepAliveResponseRequired; + } + + public void setKeepAliveResponseRequired(boolean value) { + this.keepAliveResponseRequired = value; + } + + public boolean isMonitorStarted() { + return this.monitorStarted.get(); + } + + protected synchronized void startMonitorThread() throws IOException { + if (monitorStarted.get()) { + return; + } + + + if (readCheckTime > 0) { + readCheckerTask = new SchedulerTimerTask(readChecker); + } + + + if (readCheckTime > 0) { + monitorStarted.set(true); + synchronized (AbstractInactivityMonitor.class) { + if (CHECKER_COUNTER == 0) { + ASYNC_TASKS = createExecutor(); + READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true); + } + CHECKER_COUNTER++; + if (readCheckTime > 0) { + READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); + } + } + } + } + + + protected synchronized void stopMonitorThread() { + if (monitorStarted.compareAndSet(true, false)) { + if (readCheckerTask != null) { + readCheckerTask.cancel(); + } + + synchronized (AbstractInactivityMonitor.class) { + READ_CHECK_TIMER.purge(); + CHECKER_COUNTER--; + if (CHECKER_COUNTER == 0) { + READ_CHECK_TIMER.cancel(); + READ_CHECK_TIMER = null; + ASYNC_TASKS.shutdown(); + ASYNC_TASKS = null; + } + } + } + } + + private ThreadFactory factory = new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable); + thread.setDaemon(true); + return thread; + } + }; + + private ThreadPoolExecutor createExecutor() { + ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), factory); + exec.allowCoreThreadTimeOut(true); + return exec; + } +} + diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java new file mode 100644 index 0000000000..a72aded8d1 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -0,0 +1,417 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Destination; +import javax.jms.JMSException; +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.command.*; +import org.apache.activemq.transport.stomp.FrameTranslator; +import org.apache.activemq.transport.stomp.LegacyFrameTranslator; +import org.apache.activemq.transport.stomp.ProtocolException; +import org.apache.activemq.transport.stomp.StompSubscription; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.FactoryFinder; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.LongSequenceGenerator; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.codec.CONNACK; +import org.fusesource.mqtt.codec.CONNECT; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.PINGREQ; +import org.fusesource.mqtt.codec.PINGRESP; +import org.fusesource.mqtt.codec.PUBLISH; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class MQTTProtocolConverter { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); + + private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); + + private static final String BROKER_VERSION; + private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); + + static { + InputStream in = null; + String version = "5.6.0"; + if ((in = MQTTProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + try { + version = reader.readLine(); + } catch (Exception e) { + } + } + BROKER_VERSION = version; + } + + private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); + private final SessionId sessionId = new SessionId(connectionId, -1); + private final ProducerId producerId = new ProducerId(sessionId, 1); + + private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); + + private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); + private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + private final ConcurrentHashMap subscriptions = new ConcurrentHashMap(); + private final ConcurrentHashMap tempDestinations = new ConcurrentHashMap(); + private final ConcurrentHashMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); + private final Map transactions = new ConcurrentHashMap(); + private final Map activeMQTopicMap = new LRUCache(); + private final Map mqttTopicMap = new LRUCache(); + private final MQTTTransport mqttTransport; + + private final Object commnadIdMutex = new Object(); + private int lastCommandId; + private final AtomicBoolean connected = new AtomicBoolean(false); + private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); + private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); + private final BrokerContext brokerContext; + private String version = "1.0"; + ConnectionInfo connectionInfo = new ConnectionInfo(); + private CONNECT connect; + private String clientId; + + public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) { + this.mqttTransport = mqttTransport; + this.brokerContext = brokerContext; + } + + protected int generateCommandId() { + synchronized (commnadIdMutex) { + return lastCommandId++; + } + } + + + protected void sendToActiveMQ(Command command, ResponseHandler handler) { + command.setCommandId(generateCommandId()); + if (handler != null) { + command.setResponseRequired(true); + resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); + } + mqttTransport.sendToActiveMQ(command); + } + + + /** + * Convert a MQTT command + */ + public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { + + + switch (frame.messageType()) { + case PINGREQ.TYPE: { + mqttTransport.sendToMQTT(PING_RESP_FRAME); + LOG.debug("Sent Ping Response to " + getClientId()); + break; + } + case CONNECT.TYPE: { + onMQTTConnect(new CONNECT().decode(frame)); + break; + } + case DISCONNECT.TYPE: { + LOG.debug("MQTT Client " + getClientId() + " disconnecting"); + stopTransport(); + break; + } + default: + handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); + } + + } + + + protected void onMQTTConnect(final CONNECT connect) throws ProtocolException { + + if (connected.get()) { + throw new ProtocolException("All ready connected."); + } + this.connect = connect; + + String clientId = ""; + if (connect.clientId() != null) { + clientId = connect.clientId().toString(); + } + + String userName = ""; + if (connect.userName() != null) { + userName = connect.userName().toString(); + } + String passswd = ""; + if (connect.password() != null) { + passswd = connect.password().toString(); + + } + + + configureInactivityMonitor(connect.keepAlive()); + + + connectionInfo.setConnectionId(connectionId); + if (clientId != null && clientId.isEmpty() == false) { + connectionInfo.setClientId(clientId); + } else { + connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); + } + + connectionInfo.setResponseRequired(true); + connectionInfo.setUserName(userName); + connectionInfo.setPassword(passswd); + connectionInfo.setTransportContext(mqttTransport.getPeerCertificates()); + + sendToActiveMQ(connectionInfo, new ResponseHandler() { + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + + if (response.isException()) { + // If the connection attempt fails we close the socket. + Throwable exception = ((ExceptionResponse) response).getException(); + //let the client know + CONNACK ack = new CONNACK(); + ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); + getMQTTTransport().sendToMQTT(ack.encode()); + getMQTTTransport().onException(IOExceptionSupport.create(exception)); + return; + } + + final SessionInfo sessionInfo = new SessionInfo(sessionId); + sendToActiveMQ(sessionInfo, null); + + final ProducerInfo producerInfo = new ProducerInfo(producerId); + sendToActiveMQ(producerInfo, new ResponseHandler() { + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + + if (response.isException()) { + // If the connection attempt fails we close the socket. + Throwable exception = ((ExceptionResponse) response).getException(); + CONNACK ack = new CONNACK(); + ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); + getMQTTTransport().sendToMQTT(ack.encode()); + getMQTTTransport().onException(IOExceptionSupport.create(exception)); + } + + CONNACK ack = new CONNACK(); + ack.code(CONNACK.Code.CONNECTION_ACCEPTED); + getMQTTTransport().sendToMQTT(ack.encode()); + + } + }); + + } + }); + } + + + /** + * Dispatch a ActiveMQ command + */ + + + public void onActiveMQCommand(Command command) throws IOException, JMSException { + if (command.isResponse()) { + Response response = (Response) command; + ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); + if (rh != null) { + rh.onResponse(this, response); + } else { + // Pass down any unexpected errors. Should this close the connection? + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + handleException(exception, null); + } + } + } else if (command.isMessageDispatch()) { + MessageDispatch md = (MessageDispatch) command; + StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); + if (sub != null) { + //sub.onMessageDispatch(md); + } + } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { + // Pass down any unexpected async errors. Should this close the connection? + Throwable exception = ((ConnectionError) command).getException(); + handleException(exception, null); + } else { + LOG.debug("Do not know how to process ActiveMQ Command " + command); + } + } + + + public ActiveMQMessage convertMessage(PUBLISH command) throws IOException, JMSException { + ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); + StringBuilder msgId = new StringBuilder(); + msgId.append("ID:").append(getClientId()).append(":").append(command.messageId()); + msg.setJMSMessageID(msgId.toString()); + msg.setJMSPriority(4); + + //ActiveMQTopic topic = new ActiveMQTopic(topicName); + ActiveMQTopic topic = null; + synchronized (activeMQTopicMap) { + topic = activeMQTopicMap.get(command.topicName()); + if (topic == null) { + String topicName = command.topicName().toString().replaceAll("/", "."); + topic = new ActiveMQTopic(topicName); + activeMQTopicMap.put(command.topicName(), topic); + } + } + msg.setJMSDestination(topic); + msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length); + return msg; + } + + public MQTTFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { + PUBLISH result = new PUBLISH(); + String msgId = message.getJMSMessageID(); + int offset = msgId.lastIndexOf(':'); + + short id = 0; + if (offset > 0) { + Short.parseShort(msgId.substring(offset, msgId.length() - 1)); + } + result.messageId(id); + + UTF8Buffer topicName = null; + synchronized (mqttTopicMap) { + topicName = mqttTopicMap.get(message.getJMSDestination()); + if (topicName == null) { + topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replaceAll(".", "/")); + mqttTopicMap.put(message.getJMSDestination(), topicName); + } + } + result.topicName(topicName); + + if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { + + if (!message.isCompressed() && message.getContent() != null) { + ByteSequence msgContent = message.getContent(); + if (msgContent.getLength() > 4) { + byte[] content = new byte[msgContent.getLength() - 4]; + System.arraycopy(msgContent.data, 4, content, 0, content.length); + result.payload(new Buffer(content)); + } + } else { + ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); + String messageText = msg.getText(); + if (messageText != null) { + result.payload(new Buffer(msg.getText().getBytes("UTF-8"))); + } + } + + } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { + + ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); + msg.setReadOnlyBody(true); + byte[] data = new byte[(int) msg.getBodyLength()]; + msg.readBytes(data); + result.payload(new Buffer(data)); + } else { + LOG.debug("Cannot convert " + message + " to a MQTT PUBLISH"); + } + return result.encode(); + } + + + public MQTTTransport getMQTTTransport() { + return mqttTransport; + } + + public ActiveMQDestination createTempDestination(String name, boolean topic) { + ActiveMQDestination rc = tempDestinations.get(name); + if (rc == null) { + if (topic) { + rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); + } else { + rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); + } + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + tempDestinations.put(name, rc); + tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); + } + return rc; + } + + public String getCreatedTempDestinationName(ActiveMQDestination destination) { + return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); + } + + + protected void configureInactivityMonitor(short heartBeat) throws ProtocolException { + try { + + int heartBeatMS = heartBeat * 1000; + MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); + + monitor.setReadCheckTime(heartBeatMS); + monitor.setInitialDelayTime(heartBeatMS); + + monitor.startMonitorThread(); + + } catch (Exception ex) { + + } + + LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs"); + } + + + protected void handleException(Throwable exception, MQTTFrame command) throws IOException { + LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Exception detail", exception); + } + + try { + getMQTTTransport().stop(); + } catch (Throwable e) { + LOG.error("Failed to stop MQTTT Transport ", e); + } + } + + private String getClientId() { + if (clientId == null) { + if (connect != null && connect.clientId() != null) { + clientId = connect.clientId().toString(); + } + } else { + clientId = ""; + } + return clientId; + } + + private void stopTransport() { + try { + getMQTTTransport().stop(); + } catch (Throwable e) { + LOG.debug("Failed to stop MQTT transport ", e); + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java new file mode 100644 index 0000000000..218effcd49 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; + + +public class MQTTProtocolException extends IOException { + + private static final long serialVersionUID = -2869735532997332242L; + + private final boolean fatal; + + public MQTTProtocolException() { + this(null); + } + + public MQTTProtocolException(String s) { + this(s, false); + } + + public MQTTProtocolException(String s, boolean fatal) { + this(s, fatal, null); + } + + public MQTTProtocolException(String s, boolean fatal, Throwable cause) { + super(s); + this.fatal = fatal; + initCause(cause); + } + + public boolean isFatal() { + return fatal; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java new file mode 100644 index 0000000000..ba6776c158 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedList; + +import javax.jms.JMSException; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.fusesource.mqtt.codec.MQTTFrame; + +/** + * Keeps track of the STOMP subscription so that acking is correctly done. + */ +public class MQTTSubscription { + + + protected final MQTTProtocolConverter protocolConverter; + protected final String subscriptionId; + protected final ConsumerInfo consumerInfo; + + protected final LinkedHashMap dispatchedMessage = new LinkedHashMap(); + protected final LinkedList unconsumedMessage = new LinkedList(); + + protected ActiveMQDestination destination; + protected String transformation; + + public MQTTSubscription(MQTTProtocolConverter protocolConverter, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { + this.protocolConverter = protocolConverter; + this.subscriptionId = subscriptionId; + this.consumerInfo = consumerInfo; + this.transformation = transformation; + } + + void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { + ActiveMQMessage message = (ActiveMQMessage) md.getMessage(); + /* + if (ackMode == CLIENT_ACK) { + synchronized (this) { + dispatchedMessage.put(message.getMessageId(), md); + } + } else if (ackMode == INDIVIDUAL_ACK) { + synchronized (this) { + dispatchedMessage.put(message.getMessageId(), md); + } + } else if (ackMode == AUTO_ACK) { + MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); + protocolConverter.getStompTransport().sendToActiveMQ(ack); + } + */ + MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); + protocolConverter.getMQTTTransport().sendToActiveMQ(ack); + + MQTTFrame command = protocolConverter.convertMessage(message); + protocolConverter.getMQTTTransport().sendToMQTT(command); + } + + + public String getSubscriptionId() { + return subscriptionId; + } + + public void setDestination(ActiveMQDestination destination) { + this.destination = destination; + } + + public ActiveMQDestination getDestination() { + return destination; + } + + public ConsumerInfo getConsumerInfo() { + return consumerInfo; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java new file mode 100644 index 0000000000..bed558533d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.security.cert.X509Certificate; + +import org.apache.activemq.command.Command; +import org.fusesource.mqtt.codec.MQTTFrame; + +/** + * Basic interface that mediates between protocol converter and transport + */ +public interface MQTTTransport { + + public void sendToActiveMQ(Command command); + + public void sendToMQTT(MQTTFrame command) throws IOException; + + public X509Certificate[] getPeerCertificates(); + + public void onException(IOException error); + + public MQTTInactivityMonitor getInactivityMonitor(); + + public MQTTWireFormat getWireFormat(); + + public void stop() throws Exception; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java new file mode 100644 index 0000000000..c82e00e587 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * A STOMP transport factory + */ +public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware { + + private BrokerContext brokerContext = null; + + protected String getDefaultWireFormatType() { + return "mqtt"; + } + + @SuppressWarnings("rawtypes") + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new MQTTTransportFilter(transport, format, brokerContext); + IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); + } + + public void setBrokerService(BrokerService brokerService) { + this.brokerContext = brokerService.getBrokerContext(); + } + + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + + @Override + protected Transport createInactivityMonitor(Transport transport, WireFormat format) { + MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format); + + MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class); + filter.setInactivityMonitor(monitor); + + return monitor; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java new file mode 100644 index 0000000000..5093421c53 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.security.cert.X509Certificate; + +import javax.jms.JMSException; +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.tcp.SslTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The StompTransportFilter normally sits on top of a TcpTransport that has been + * configured with the StompWireFormat and is used to convert STOMP commands to + * ActiveMQ commands. All of the conversion work is done by delegating to the + * MQTTProtocolConverter. + * + * @author chirino + */ +public class MQTTTransportFilter extends TransportFilter implements MQTTTransport { + private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class); + private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO"); + private final MQTTProtocolConverter protocolConverter; + private MQTTInactivityMonitor monitor; + private MQTTWireFormat wireFormat; + + private boolean trace; + + public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { + super(next); + this.protocolConverter = new MQTTProtocolConverter(this, brokerContext); + + if (wireFormat instanceof MQTTWireFormat) { + this.wireFormat = (MQTTWireFormat) wireFormat; + } + } + + public void oneway(Object o) throws IOException { + try { + final Command command = (Command) o; + protocolConverter.onActiveMQCommand(command); + } catch (JMSException e) { + throw IOExceptionSupport.create(e); + } + } + + public void onCommand(Object command) { + try { + if (trace) { + TRACE.trace("Received: \n" + command); + } + + protocolConverter.onMQTTCommand((MQTTFrame) command); + } catch (IOException e) { + onException(e); + } catch (JMSException e) { + onException(IOExceptionSupport.create(e)); + } + } + + public void sendToActiveMQ(Command command) { + TransportListener l = transportListener; + if (l != null) { + l.onCommand(command); + } + } + + public void sendToMQTT(MQTTFrame command) throws IOException { + if (trace) { + TRACE.trace("Sending: \n" + command); + } + Transport n = next; + if (n != null) { + n.oneway(command); + } + } + + public X509Certificate[] getPeerCertificates() { + if (next instanceof SslTransport) { + X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); + if (trace && peerCerts != null) { + LOG.debug("Peer Identity has been verified\n"); + } + return peerCerts; + } + return null; + } + + public boolean isTrace() { + return trace; + } + + public void setTrace(boolean trace) { + this.trace = trace; + } + + @Override + public MQTTInactivityMonitor getInactivityMonitor() { + return monitor; + } + + public void setInactivityMonitor(MQTTInactivityMonitor monitor) { + this.monitor = monitor; + } + + @Override + public MQTTWireFormat getWireFormat() { + return this.wireFormat; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java new file mode 100644 index 0000000000..b446933034 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.mqtt.codec.MQTTFrame; + +/** + * Implements marshalling and unmarsalling the MQTT protocol. + */ +public class MQTTWireFormat implements WireFormat { + + + private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256; + + private boolean encodingEnabled = false; + private int version = 1; + + public ByteSequence marshal(Object command) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + marshal(command, dos); + dos.close(); + return baos.toByteSequence(); + } + + public Object unmarshal(ByteSequence packet) throws IOException { + ByteArrayInputStream stream = new ByteArrayInputStream(packet); + DataInputStream dis = new DataInputStream(stream); + return unmarshal(dis); + } + + public void marshal(Object command, DataOutput dataOut) throws IOException { + MQTTFrame frame = (MQTTFrame) command; + dataOut.write(frame.header()); + + int remaining = 0; + for (Buffer buffer : frame.buffers) { + remaining += buffer.length; + } + do { + byte digit = (byte) (remaining & 0x7F); + remaining >>>= 7; + if (remaining > 0) { + digit |= 0x80; + } + dataOut.write(digit); + } while (remaining > 0); + for (Buffer buffer : frame.buffers) { + dataOut.write(buffer.data, buffer.offset, buffer.length); + } + } + + public Object unmarshal(DataInput dataIn) throws IOException { + byte header = dataIn.readByte(); + + byte digit = 0; + + int multiplier = 1; + int length = 0; + do { + digit = dataIn.readByte(); + length += (digit & 0x7F) * multiplier; + multiplier <<= 7; + } + while ((digit & 0x80) != 0); + if (length >= 0) { + if (length > MAX_MESSAGE_LENGTH) { + throw new IOException("The maximum message length was exceeded"); + } + + if (length > 0) { + byte[] data = new byte[length]; + dataIn.readFully(data); + Buffer body = new Buffer(data); + return new MQTTFrame(body).header(header); + } else { + return new MQTTFrame().header(header); + } + } + return null; + } + + /** + * @param the version of the wire format + */ + public void setVersion(int version) { + this.version = version; + } + + /** + * @return the version of the wire format + */ + public int getVersion() { + return this.version; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java new file mode 100644 index 0000000000..4beaa3915f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.wireformat.WireFormatFactory; + +/** + * Creates WireFormat objects that marshalls the Stomp protocol. + */ +public class MQTTWireFormatFactory implements WireFormatFactory { + public WireFormat createWireFormat() { + return new MQTTWireFormat(); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java new file mode 100644 index 0000000000..75c218abca --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; + +import org.apache.activemq.command.Response; + + +/** + * Interface used by the MQTTProtocolConverter for callbacks. + */ +interface ResponseHandler { + void onResponse(MQTTProtocolConverter converter, Response response) throws IOException; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java new file mode 100644 index 0000000000..9c922ec36c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +public class WildCardConvertor { + + static String convertActiveMQToMQTT(String name) { + String result = name.replaceAll("#", ">"); + result = result.replaceAll("+", "*"); + result = result.replaceAll("/", "."); + return result; + } + + static String convertMQTTToActiveMQ(String name) { + String result = name.replaceAll(">", "#"); + result = result.replaceAll("*", "+"); + result = result.replaceAll(".", "/"); + return result; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html new file mode 100644 index 0000000000..4f7aed8072 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html @@ -0,0 +1,25 @@ + + + + + + +An implementation of the MQTT 3.1 protocol - see http://mqtt.org/ + + + diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt new file mode 100644 index 0000000000..8f3e216675 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.mqtt.MQTTTransportFactory diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt new file mode 100644 index 0000000000..fab0c94f86 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.mqtt.MQTTWireFormatFactory \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java new file mode 100644 index 0000000000..b5207f0c78 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.util.Vector; + +import org.apache.activemq.broker.BrokerService; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// https://issues.apache.org/jira/browse/AMQ-3393 +public class MQTTConnectTest { + private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); + BrokerService brokerService; + Vector exceptions = new Vector(); + + @Before + public void startBroker() throws Exception { + exceptions.clear(); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testConnect() throws Exception { + + brokerService.addConnector("mqtt://localhost:1883"); + brokerService.start(); + MQTT mqtt = new MQTT(); + mqtt.setHost("localhost",1883); + BlockingConnection connection = mqtt.blockingConnection(); + + connection.connect(); + Thread.sleep(1000); + connection.disconnect(); + } + + +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java new file mode 100644 index 0000000000..9ba28dae5f --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -0,0 +1,2002 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.transport.stomp.SamplePojo; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTTest extends CombinationTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); + + protected String bindAddress = "mqtt://localhost:1883"; + protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; + protected String jmsUri = "vm://localhost"; + + private BrokerService broker; + protected StompConnection stompConnection = new StompConnection(); + protected Connection connection; + protected Session session; + protected ActiveMQQueue queue; + private final String xmlObject = "\n" + + " Dejan\n" + + " Belgrade\n" + + ""; + + private String xmlMap = "\n" + + " \n" + + " name\n" + + " Dejan\n" + + " \n" + + " \n" + + " city\n" + + " Belgrade\n" + + " \n" + + "\n"; + + private final String jsonObject = "{\"pojo\":{" + + "\"name\":\"Dejan\"," + + "\"city\":\"Belgrade\"" + + "}}"; + + private String jsonMap = "{\"map\":{" + + "\"entry\":[" + + "{\"string\":[\"name\",\"Dejan\"]}," + + "{\"string\":[\"city\",\"Belgrade\"]}" + + "]" + + "}}"; + + @Override + protected void setUp() throws Exception { + // The order of the entries is different when using ibm jdk 5. + if (System.getProperty("java.vendor").equals("IBM Corporation") + && System.getProperty("java.version").startsWith("1.5")) { + xmlMap = "\n" + + " \n" + + " city\n" + + " Belgrade\n" + + " \n" + + " \n" + + " name\n" + + " Dejan\n" + + " \n" + + "\n"; + jsonMap = "{\"map\":{" + + "\"entry\":[" + + "{\"string\":[\"city\",\"Belgrade\"]}," + + "{\"string\":[\"name\",\"Dejan\"]}" + + "]" + + "}}"; + } + broker = BrokerFactory.createBroker(new URI(confUri)); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + broker.waitUntilStarted(); + + stompConnect(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri); + connection = cf.createConnection("system", "manager"); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = new ActiveMQQueue(getQueueName()); + connection.start(); + } + + protected void stompConnect() throws IOException, URISyntaxException, UnknownHostException { + URI connectUri = new URI(bindAddress); + stompConnection.open(createSocket(connectUri)); + } + + private void stompConnect(StompConnection connection) throws IOException, URISyntaxException, UnknownHostException { + URI connectUri = new URI(bindAddress); + connection.open(createSocket(connectUri)); + } + + protected Socket createSocket(URI connectUri) throws IOException { + return new Socket("127.0.0.1", connectUri.getPort()); + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + + @Override + protected void tearDown() throws Exception { + try { + connection.close(); + stompDisconnect(); + } catch(Exception e) { + // Some tests explicitly disconnect from stomp so can ignore + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } + + protected void stompDisconnect() throws IOException { + if (stompConnection != null) { + stompConnection.close(); + stompConnection = null; + } + } + + public void sendMessage(String msg) throws Exception { + sendMessage(msg, "foo", "xyz"); + } + + public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException { + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage(msg); + message.setStringProperty(propertyName, propertyValue); + producer.send(message); + } + + public void sendBytesMessage(byte[] msg) throws Exception { + MessageProducer producer = session.createProducer(queue); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(msg); + producer.send(message); + } + + public void testConnect() throws Exception { + + String connectFrame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "request-id: 1\n" + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("response-id:1") >= 0); + + } + + public void testSendMessage() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + + // Make sure that the timestamp is valid - should + // be very close to the current time. + long tnow = System.currentTimeMillis(); + long tmsg = message.getJMSTimestamp(); + assertTrue(Math.abs(tnow - tmsg) < 1000); + } + + public void testJMSXGroupIdCanBeSet() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "JMSXGroupID: TEST\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID()); + } + + public void testSendMessageWithCustomHeadersAndSelector() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "foo:abc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + assertEquals("foo", "abc", message.getStringProperty("foo")); + assertEquals("bar", "123", message.getStringProperty("bar")); + } + + public void testSendMessageWithDelay() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "AMQ_SCHEDULED_DELAY:5000\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2000); + assertNull(message); + message = (TextMessage)consumer.receive(5000); + assertNotNull(message); + } + + public void testSendMessageWithStandardHeaders() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "correlation-id:c123\n" + "priority:3\n" + "type:t345\n" + "JMSXGroupID:abc\n" + "foo:abc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID()); + assertEquals("getJMSType", "t345", message.getJMSType()); + assertEquals("getJMSPriority", 3, message.getJMSPriority()); + assertEquals("foo", "abc", message.getStringProperty("foo")); + assertEquals("bar", "123", message.getStringProperty("bar")); + + assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); + ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message; + assertEquals("GroupID", "abc", amqMessage.getGroupID()); + } + + public void testSendMessageWithNoPriorityReceivesDefault() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "correlation-id:c123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + assertEquals("getJMSPriority", 4, message.getJMSPriority()); + } + + public void testReceipts() throws Exception { + + StompConnection receiver = new StompConnection(); + URI connectUri = new URI(bindAddress); + receiver.open(createSocket(connectUri)); + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + receiver.sendFrame(frame); + + frame = receiver.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + receiver.sendFrame(frame); + + frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = receiver.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue("Stomp Message does not contain receipt request", frame.indexOf(Stomp.Headers.RECEIPT_REQUESTED) == -1); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("RECEIPT")); + assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + receiver.sendFrame(frame); + + MessageConsumer consumer = session.createConsumer(queue); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("RECEIPT")); + assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); + + TextMessage message = (TextMessage)consumer.receive(10000); + assertNotNull(message); + assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + + public void testSubscriptionReceipts() throws Exception { + final int done = 500; + int count = 0; + int receiptId = 0; + + URI connectUri = new URI(bindAddress); + + do { + + StompConnection sender = new StompConnection(); + sender.open(createSocket(connectUri)); + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + sender.sendFrame(frame); + + frame = sender.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n" + "Hello World:" + (count++) + "\n\n" + Stomp.NULL; + sender.sendFrame(frame); + frame = sender.receiveFrame(); + assertTrue("" + frame, frame.startsWith("RECEIPT")); + + sender.disconnect(); + + StompConnection receiver = new StompConnection(); + receiver.open(createSocket(connectUri)); + + frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + receiver.sendFrame(frame); + + frame = receiver.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + Stomp.NULL; + receiver.sendFrame(frame); + + frame = receiver.receiveFrame(); + assertTrue("" + frame, frame.startsWith("RECEIPT")); + assertTrue("Receipt contains receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); + frame = receiver.receiveFrame(); + assertTrue("" + frame, frame.startsWith("MESSAGE")); + + // remove suscription so we don't hang about and get next message + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + Stomp.NULL; + receiver.sendFrame(frame); + frame = receiver.receiveFrame(); + assertTrue("" + frame, frame.startsWith("RECEIPT")); + + receiver.disconnect(); + } while (count < done); + + } + + public void testSubscribeWithAutoAck() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + sendMessage(getName()); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithAutoAckAndBytesMessage() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + sendBytesMessage(new byte[] { + 1, 2, 3, 4, 5 + }); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); + Matcher clMmatcher = cl.matcher(frame); + assertTrue(clMmatcher.find()); + assertEquals("5", clMmatcher.group(1)); + + assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find()); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testBytesMessageWithNulls() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().startsWith("MESSAGE")); + + String length = message.getHeaders().get("content-length"); + assertEquals("5", length); + + assertEquals(5, message.getContent().length); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSendMultipleBytesMessages() throws Exception { + + final int MSG_COUNT = 50; + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + for( int ix = 0; ix < MSG_COUNT; ix++) { + frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + for( int ix = 0; ix < MSG_COUNT; ix++) { + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().startsWith("MESSAGE")); + + String length = message.getHeaders().get("content-length"); + assertEquals("5", length); + + assertEquals(5, message.getContent().length); + } + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithMessageSentWithProperties() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage("Hello World"); + message.setStringProperty("s", "value"); + message.setBooleanProperty("n", false); + message.setByteProperty("byte", (byte)9); + message.setDoubleProperty("d", 2.0); + message.setFloatProperty("f", (float)6.0); + message.setIntProperty("i", 10); + message.setLongProperty("l", 121); + message.setShortProperty("s", (short)12); + producer.send(message); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testMessagesAreInOrder() throws Exception { + int ctr = 10; + String[] data = new String[ctr]; + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + for (int i = 0; i < ctr; ++i) { + data[i] = getName() + i; + sendMessage(data[i]); + } + + for (int i = 0; i < ctr; ++i) { + frame = stompConnection.receiveFrame(); + assertTrue("Message not in order", frame.indexOf(data[i]) >= 0); + } + + // sleep a while before publishing another set of messages + TimeUnit.SECONDS.sleep(2); + + for (int i = 0; i < ctr; ++i) { + data[i] = getName() + ":second:" + i; + sendMessage(data[i]); + } + + for (int i = 0; i < ctr; ++i) { + frame = stompConnection.receiveFrame(); + assertTrue("Message not in order", frame.indexOf(data[i]) >= 0); + } + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithAutoAckAndSelector() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 'zzz'\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + sendMessage("Ignored message", "foo", "1234"); + sendMessage("Real message", "foo", "zzz"); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithAutoAckAndNumericSelector() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 42\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Ignored + frame = "SEND\n" + "foo:abc\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Matches + frame = "SEND\n" + "foo:42\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithAutoAckAndBooleanSelector() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = true\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Ignored + frame = "SEND\n" + "foo:false\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Matches + frame = "SEND\n" + "foo:true\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithAutoAckAnFloatSelector() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 3.14159\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Ignored + frame = "SEND\n" + "foo:6.578\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Matches + frame = "SEND\n" + "foo:3.14159\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithClientAck() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + sendMessage(getName()); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + stompDisconnect(); + + // message should be received since message was not acknowledged + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertTrue(message.getJMSRedelivered()); + } + + public void testSubscribeWithClientAckedAndContentLength() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + sendMessage(getName()); + StompFrame msg = stompConnection.receive(); + + + assertTrue(msg.getAction().equals("MESSAGE")); + + HashMap ackHeaders = new HashMap(); + ackHeaders.put("message-id", msg.getHeaders().get("message-id")); + ackHeaders.put("content-length", "8511"); + + StompFrame ack = new StompFrame("ACK", ackHeaders); + stompConnection.sendFrame(ack.format()); + + final QueueViewMBean queueView = getProxyToQueue(getQueueName()); + assertTrue("dequeue complete", Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount()); + return queueView.getDequeueCount() == 1; + } + })); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + stompDisconnect(); + + // message should not be received since it was acknowledged + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage)consumer.receive(500); + assertNull(message); + } + + public void testUnsubscribe() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // send a message to our queue + sendMessage("first message"); + + // receive message from socket + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + // remove suscription + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt:1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue("" + frame, frame.startsWith("RECEIPT")); + + // send a message to our queue + sendMessage("second message"); + + try { + frame = stompConnection.receiveFrame(); + LOG.info("Received frame: " + frame); + fail("No message should have been received since subscription was removed"); + } catch (SocketTimeoutException e) { + } + } + + public void testTransactionCommit() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + + frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(10000); + assertNotNull("Should have received a message", message); + } + + public void testTransactionRollback() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + + frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "first message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // rollback first message + frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // only second msg should be received since first msg was rolled back + TextMessage message = (TextMessage)consumer.receive(10000); + assertNotNull(message); + assertEquals("second message", message.getText().trim()); + } + + public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception { + assertClients(1); + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + assertClients(2); + + // now lets kill the stomp connection + stompConnection.close(); + + assertClients(1); + } + + public void testConnectNotAuthenticatedWrongUser() throws Exception { + String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + + assertTrue(f.startsWith("ERROR")); + assertClients(1); + } + + public void testConnectNotAuthenticatedWrongPassword() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + + assertTrue(f.startsWith("ERROR")); + assertClients(1); + } + + public void testSendNotAuthorized() throws Exception { + + String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("ERROR")); + } + + public void testSubscribeNotAuthorized() throws Exception { + + String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("ERROR")); + + } + + public void testTransformationUnknownTranslator() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:test" + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + } + + public void testTransformationFailed() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR)); + assertEquals("Hello World", message.getText()); + } + + public void testTransformationSendXMLObject() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL; + + stompConnection.sendFrame(frame); + + ObjectMessage message = (ObjectMessage)consumer.receive(2500); + assertNotNull(message); + SamplePojo object = (SamplePojo)message.getObject(); + assertEquals("Dejan", object.getName()); + } + + public void testTransformationSendJSONObject() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL; + + stompConnection.sendFrame(frame); + + ObjectMessage message = (ObjectMessage)consumer.receive(2500); + assertNotNull(message); + SamplePojo object = (SamplePojo)message.getObject(); + assertEquals("Dejan", object.getName()); + } + + public void testTransformationSubscribeXML() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlObject)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveJSONObject() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonObject)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveXMLObject() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlObject)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveObject() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlObject)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveXMLObjectAndMap() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(objMessage); + + MapMessage mapMessage = session.createMapMessage(); + mapMessage.setString("name", "Dejan"); + mapMessage.setString("city", "Belgrade"); + producer.send(mapMessage); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlObject)); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlMap.trim())); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveJSONObjectAndMap() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + producer.send(objMessage); + + MapMessage mapMessage = session.createMapMessage(); + mapMessage.setString("name", "Dejan"); + mapMessage.setString("city", "Belgrade"); + producer.send(mapMessage); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonObject)); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonMap.trim())); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationSendAndReceiveXmlMap() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL; + + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertNotNull(frame); + assertTrue(frame.trim().endsWith(xmlMap.trim())); + assertTrue(frame.contains("jms-map-xml")); + } + + public void testTransformationSendAndReceiveJsonMap() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL; + + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertNotNull(frame); + assertTrue(frame.trim().endsWith(jsonMap.trim())); + assertTrue(frame.contains("jms-map-json")); + } + + public void testTransformationReceiveBytesMessage() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[]{1, 2, 3, 4, 5}); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); + Matcher clMmatcher = cl.matcher(frame); + assertTrue(clMmatcher.find()); + assertEquals("5", clMmatcher.group(1)); + + assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find()); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationNotOverrideSubscription() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString()); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonObject)); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationIgnoreTransformation() throws Exception { + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); + message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString()); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.endsWith("\n\n")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationSendXMLMap() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL; + + stompConnection.sendFrame(frame); + + MapMessage message = (MapMessage) consumer.receive(2500); + assertNotNull(message); + assertEquals(message.getString("name"), "Dejan"); + } + + public void testTransformationSendJSONMap() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL; + + stompConnection.sendFrame(frame); + + MapMessage message = (MapMessage) consumer.receive(2500); + assertNotNull(message); + assertEquals(message.getString("name"), "Dejan"); + } + + public void testTransformationReceiveXMLMap() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + MapMessage message = session.createMapMessage(); + message.setString("name", "Dejan"); + message.setString("city", "Belgrade"); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(xmlMap.trim())); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransformationReceiveJSONMap() throws Exception { + + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); + MapMessage message = session.createMapMessage(); + message.setString("name", "Dejan"); + message.setString("city", "Belgrade"); + producer.send(message); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + + assertTrue(frame.trim().endsWith(jsonMap.trim())); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testDurableUnsub() throws Exception { + // get broker JMX view + + String domain = "org.apache.activemq"; + ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); + + BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); + + // connect + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + assertEquals(view.getDurableTopicSubscribers().length, 0); + + // subscribe + frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + // wait a bit for MBean to get refreshed + try { + Thread.sleep(400); + } catch (InterruptedException e){} + + assertEquals(view.getDurableTopicSubscribers().length, 1); + // disconnect + frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + try { + Thread.sleep(400); + } catch (InterruptedException e){} + + //reconnect + stompConnect(); + // connect + frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + // unsubscribe + frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + try { + Thread.sleep(400); + } catch (InterruptedException e){} + assertEquals(view.getDurableTopicSubscribers().length, 0); + } + + public void testMessageIdHeader() throws Exception { + stompConnection.connect("system", "manager"); + + stompConnection.begin("tx1"); + stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null); + stompConnection.commit("tx1"); + + stompConnection.subscribe("/queue/" + getQueueName()); + StompFrame stompMessage = stompConnection.receive(); + assertNull(stompMessage.getHeaders().get("transaction")); + } + + public void testPrefetchSize() throws Exception { + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + headers.put("activemq.prefetchSize", "1"); + stompConnection.subscribe("/queue/" + getQueueName(), "client", headers); + + // send messages using JMS + sendMessage("message 1"); + sendMessage("message 2"); + sendMessage("message 3"); + sendMessage("message 4"); + sendMessage("message 5"); + + StompFrame frame = stompConnection.receive(); + assertEquals(frame.getBody(), "message 1"); + + stompConnection.begin("tx1"); + stompConnection.ack(frame, "tx1"); + + StompFrame frame1 = stompConnection.receive(); + assertEquals(frame1.getBody(), "message 2"); + + try { + StompFrame frame2 = stompConnection.receive(500); + if (frame2 != null) { + fail("Should not have received the second message"); + } + } catch (SocketTimeoutException soe) {} + + stompConnection.ack(frame1, "tx1"); + Thread.sleep(1000); + stompConnection.abort("tx1"); + + stompConnection.begin("tx2"); + + // Previously delivered message need to get re-acked... + stompConnection.ack(frame, "tx2"); + stompConnection.ack(frame1, "tx2"); + + StompFrame frame3 = stompConnection.receive(); + assertEquals(frame3.getBody(), "message 3"); + stompConnection.ack(frame3, "tx2"); + + StompFrame frame4 = stompConnection.receive(); + assertEquals(frame4.getBody(), "message 4"); + stompConnection.ack(frame4, "tx2"); + + stompConnection.commit("tx2"); + + stompConnection.begin("tx3"); + StompFrame frame5 = stompConnection.receive(); + assertEquals(frame5.getBody(), "message 5"); + stompConnection.ack(frame5, "tx3"); + stompConnection.commit("tx3"); + + stompDisconnect(); + } + + public void testTransactionsWithMultipleDestinations() throws Exception { + + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + headers.put("activemq.prefetchSize", "1"); + headers.put("activemq.exclusive", "true"); + + stompConnection.subscribe("/queue/test1", "client", headers); + + stompConnection.begin("ID:tx1"); + + headers.clear(); + headers.put("receipt", "ID:msg1"); + stompConnection.send("/queue/test2", "test message", "ID:tx1", headers); + + stompConnection.commit("ID:tx1"); + + // make sure connection is active after commit + Thread.sleep(1000); + stompConnection.send("/queue/test1", "another message"); + + StompFrame frame = stompConnection.receive(500); + assertNotNull(frame); + + stompConnection.disconnect(); + } + + public void testTempDestination() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/temp-queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(1000); + assertEquals("Hello World", message.getBody()); + } + + public void testJMSXUserIDIsSetInMessage() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(5000); + assertNotNull(message); + assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID)); + } + + public void testJMSXUserIDIsSetInStompMessage() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(5000); + assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID)); + } + + public void testClientSetMessageIdIsIgnored() throws Exception { + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Message.MESSAGE_ID, "Thisisnotallowed"); + headers.put(Stomp.Headers.Message.TIMESTAMP, "1234"); + headers.put(Stomp.Headers.Message.REDELIVERED, "true"); + headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed"); + headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed"); + + stompConnection.connect("system", "manager"); + + stompConnection.send("/queue/" + getQueueName(), "msg", null, headers); + + stompConnection.subscribe("/queue/" + getQueueName()); + StompFrame stompMessage = stompConnection.receive(); + + Map mess_headers = new HashMap(); + mess_headers = stompMessage.getHeaders(); + + assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID) + )); + assertFalse("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP))); + assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED)); + assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION)); + assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID)); + } + + public void testExpire() throws Exception { + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + long timestamp = System.currentTimeMillis(); + headers.put(Stomp.Headers.Message.EXPIRATION_TIME, String.valueOf(timestamp)); + headers.put(Stomp.Headers.Send.PERSISTENT, "true"); + + stompConnection.send("/queue/" + getQueueName(), "msg", null, headers); + + stompConnection.subscribe("/queue/ActiveMQ.DLQ"); + StompFrame stompMessage = stompConnection.receive(1000); + assertNotNull(stompMessage); + assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.ORIGINAL_DESTINATION), "/queue/" + getQueueName()); + } + + public void testDefaultJMSReplyToDest() throws Exception { + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString"); + headers.put(Stomp.Headers.Send.PERSISTENT, "true"); + + stompConnection.send("/queue/" + getQueueName(), "msg-with-reply-to", null, headers); + + stompConnection.subscribe("/queue/" + getQueueName()); + StompFrame stompMessage = stompConnection.receive(1000); + assertNotNull(stompMessage); + assertEquals("" + stompMessage, stompMessage.getHeaders().get(Stomp.Headers.Send.REPLY_TO), "JustAString"); + } + + public void testPersistent() throws Exception { + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Message.PERSISTENT, "true"); + + stompConnection.send("/queue/" + getQueueName(), "hello", null, headers); + + stompConnection.subscribe("/queue/" + getQueueName()); + + StompFrame stompMessage = stompConnection.receive(); + assertNotNull(stompMessage); + assertNotNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT)); + assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT), "true"); + } + + public void testPersistentDefaultValue() throws Exception { + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + + stompConnection.send("/queue/" + getQueueName(), "hello", null, headers); + + stompConnection.subscribe("/queue/" + getQueueName()); + + StompFrame stompMessage = stompConnection.receive(); + assertNotNull(stompMessage); + assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT)); + } + + public void testReceiptNewQueue() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame receipt = stompConnection.receive(); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id")); + + frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + receipt = stompConnection.receive(); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id")); + + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().startsWith("MESSAGE")); + + String length = message.getHeaders().get("content-length"); + assertEquals("0", length); + assertEquals(0, message.getContent().length); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testTransactedClientAckBrokerStats() throws Exception { + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + sendMessage(getName()); + sendMessage(getName()); + + stompConnection.begin("tx1"); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + stompConnection.ack(message, "tx1"); + + message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + stompConnection.ack(message, "tx1"); + + stompConnection.commit("tx1"); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + final QueueViewMBean queueView = getProxyToQueue(getQueueName()); + Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + return queueView.getDequeueCount() == 2; + } + }); + assertEquals(2, queueView.getDispatchCount()); + assertEquals(2, queueView.getDequeueCount()); + assertEquals(0, queueView.getQueueSize()); + } + + public void testReplytoModification() throws Exception { + String replyto = "some destination"; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "reply-to:" + replyto + "\n\nhello world" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + assertEquals(replyto, message.getHeaders().get("reply-to")); + + stompConnection.sendFrame("DISCONNECT\n" + "\n\n" + Stomp.NULL); + } + + public void testReplyToDestinationNaming() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + doTestActiveMQReplyToTempDestination("topic"); + doTestActiveMQReplyToTempDestination("queue"); + } + + public void testSendNullBodyTextMessage() throws Exception { + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + sendMessage(null); + frame = stompConnection.receiveFrame(); + assertNotNull("Message not received", frame); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + private void doTestActiveMQReplyToTempDestination(String type) throws Exception { + LOG.info("Starting test on Temp Destinations using a temporary: " + type); + + final String dest = "/" + type + "/" + getQueueName(); + final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1", type); + LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest); + + // Subscribe both to the out-bound destination and the response tempt destination + stompConnection.subscribe(dest); + stompConnection.subscribe(tempDest); + + // Send a Message with the ReplyTo value set. + HashMap properties = new HashMap(); + properties.put(Stomp.Headers.Send.REPLY_TO, tempDest); + LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest)); + stompConnection.send(dest, "REQUEST", null, properties); + + // The subscription should receive a response with the ReplyTo property set. + StompFrame received = stompConnection.receive(); + assertNotNull(received); + String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO); + assertNotNull(remoteReplyTo); + assertTrue(remoteReplyTo.startsWith(String.format("/temp-%s/", type))); + LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo)); + + // Reply to the request using the given ReplyTo destination + stompConnection.send(remoteReplyTo, "RESPONSE"); + + // The response should be received by the Temporary Destination subscription + StompFrame reply = stompConnection.receive(); + assertNotNull(reply); + assertEquals("MESSAGE", reply.getAction()); + LOG.info(String.format("Response %s received", reply.getAction())); + + BrokerViewMBean broker = getProxyToBroker(); + if (type.equals("topic")) { + assertEquals(1, broker.getTemporaryTopics().length); + } else { + assertEquals(1, broker.getTemporaryQueues().length); + } + } + + public void testReplyToAcrossConnections() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + doReplyToAcrossConnections("topic"); + doReplyToAcrossConnections("queue"); + } + + private void doReplyToAcrossConnections(String type) throws Exception { + LOG.info("Starting test on Temp Destinations using a temporary: " + type); + + StompConnection responder = new StompConnection(); + stompConnect(responder); + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + responder.sendFrame(frame); + + frame = responder.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + final String dest = "/" + type + "/" + getQueueName(); + final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1:1:0:1", type); + LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest); + + // Subscribe to the temp destination, this is where we get our response. + stompConnection.subscribe(tempDest); + + // Subscribe to the Queue, this is where we get our request. + responder.subscribe(dest); + + // Send a Message with the ReplyTo value set. + HashMap properties = new HashMap(); + properties.put(Stomp.Headers.Send.REPLY_TO, tempDest); + properties.put(Stomp.Headers.RECEIPT_REQUESTED, "send-1"); + LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest)); + stompConnection.send(dest, "REQUEST", null, properties); + + frame = stompConnection.receiveFrame(); + assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT")); + assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); + + + // The subscription should receive a response with the ReplyTo property set. + StompFrame received = responder.receive(); + assertNotNull(received); + String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO); + assertNotNull(remoteReplyTo); + assertTrue(remoteReplyTo.startsWith(String.format("/remote-temp-%s/", type))); + LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo)); + + // Reply to the request using the given ReplyTo destination + responder.send(remoteReplyTo, "RESPONSE"); + + // The response should be received by the Temporary Destination subscription + StompFrame reply = stompConnection.receive(); + assertNotNull(reply); + assertEquals("MESSAGE", reply.getAction()); + assertTrue(reply.getBody().contains("RESPONSE")); + LOG.info(String.format("Response %s received", reply.getAction())); + + BrokerViewMBean broker = getProxyToBroker(); + if (type.equals("topic")) { + assertEquals(1, broker.getTemporaryTopics().length); + } else { + assertEquals(1, broker.getTemporaryQueues().length); + } + } + + private BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException { + ObjectName brokerViewMBean = new ObjectName( + "org.apache.activemq:Type=Broker,BrokerName=localhost"); + BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext() + .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true); + return proxy; + } + + private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":Type=Queue,Destination=" + name + + ",BrokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + + protected void assertClients(final int expected) throws Exception { + Wait.waitFor(new Wait.Condition() + { + @Override + public boolean isSatisified() throws Exception { + return broker.getBroker().getClients().length == expected; + } + }); + org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); + int actual = clients.length; + + assertEquals("Number of clients", expected, actual); + } + + public void testDisconnectDoesNotDeadlockBroker() throws Exception { + for (int i = 0; i < 20; ++i) { + doTestConnectionLeak(); + } + } + + private void doTestConnectionLeak() throws Exception { + stompConnect(); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + boolean gotMessage = false; + boolean gotReceipt = false; + + char[] payload = new char[1024]; + Arrays.fill(payload, 'A'); + + String test = "SEND\n" + + "x-type:DEV-3485\n" + + "x-uuid:" + UUID.randomUUID() + "\n" + + "persistent:true\n" + + "receipt:" + UUID.randomUUID() + "\n" + + "destination:/queue/test.DEV-3485" + + "\n\n" + + new String(payload) + Stomp.NULL; + + frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + stompConnection.sendFrame(test); + + // We only want one of them, to trigger the shutdown and potentially + // see a deadlock. + while (!gotMessage && !gotReceipt) { + frame = stompConnection.receiveFrame(); + + LOG.debug("Received the frame: " + frame); + + if (frame.startsWith("RECEIPT")) { + gotReceipt = true; + } else if(frame.startsWith("MESSAGE")) { + gotMessage = true; + } else { + fail("Received a frame that we were not expecting."); + } + } + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + stompConnection.close(); + } +}