From 0f5b406b43a59ba0942638e0e8018ea8aade8ca6 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 4 Apr 2012 19:46:26 +0000 Subject: [PATCH] more functionality for MQTT for https://issues.apache.org/jira/browse/AMQ-3786 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1309566 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/mqtt/MQTTInactivityMonitor.java | 10 +- .../transport/mqtt/MQTTProtocolConverter.java | 347 ++- .../mqtt/MQTTSslTransportFactory.java | 76 + .../transport/mqtt/MQTTSubscription.java | 62 +- .../transport/mqtt/MQTTTransportFactory.java | 2 +- .../transport/mqtt/MQTTTransportFilter.java | 6 +- .../transport/mqtt/WildCardConvertor.java | 34 - .../activemq/transport/mqtt/package.html | 2 +- .../org/apache/activemq/transport/mqtt+ssl | 17 + .../transport/mqtt/MQTTConnectTest.java | 6 +- .../transport/mqtt/MQTTSSLConnectTest.java | 95 + .../activemq/transport/mqtt/MQTTTest.java | 2035 +---------------- 12 files changed, 552 insertions(+), 2140 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index 1cfe382a33..327568b48a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -61,10 +61,8 @@ public class MQTTInactivityMonitor extends TransportFilter { private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; - private boolean useKeepAlive = true; private boolean keepAliveResponseRequired; - protected WireFormat wireFormat; private final Runnable readChecker = new Runnable() { long lastRunTime; @@ -99,7 +97,6 @@ public class MQTTInactivityMonitor extends TransportFilter { public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { super(next); - this.wireFormat = wireFormat; } public void start() throws Exception { @@ -198,9 +195,6 @@ public class MQTTInactivityMonitor extends TransportFilter { } } - public void setUseKeepAlive(boolean val) { - useKeepAlive = val; - } public long getReadCheckTime() { return readCheckTime; @@ -231,7 +225,7 @@ public class MQTTInactivityMonitor extends TransportFilter { return this.monitorStarted.get(); } - protected synchronized void startMonitorThread() throws IOException { + synchronized void startMonitorThread() { if (monitorStarted.get()) { return; } @@ -258,7 +252,7 @@ public class MQTTInactivityMonitor extends TransportFilter { } - protected synchronized void stopMonitorThread() { + synchronized void stopMonitorThread() { if (monitorStarted.compareAndSet(true, false)) { if (readCheckerTask != null) { readCheckerTask.cancel(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index a72aded8d1..245bd41947 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -16,37 +16,30 @@ */ package org.apache.activemq.transport.mqtt; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.command.*; -import org.apache.activemq.transport.stomp.FrameTranslator; -import org.apache.activemq.transport.stomp.LegacyFrameTranslator; import org.apache.activemq.transport.stomp.ProtocolException; -import org.apache.activemq.transport.stomp.StompSubscription; +import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.LongSequenceGenerator; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; -import org.fusesource.mqtt.codec.CONNACK; -import org.fusesource.mqtt.codec.CONNECT; -import org.fusesource.mqtt.codec.DISCONNECT; -import org.fusesource.mqtt.codec.MQTTFrame; -import org.fusesource.mqtt.codec.PINGREQ; -import org.fusesource.mqtt.codec.PINGRESP; -import org.fusesource.mqtt.codec.PUBLISH; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.codec.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,66 +48,46 @@ class MQTTProtocolConverter { private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); - - private static final String BROKER_VERSION; private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); - static { - InputStream in = null; - String version = "5.6.0"; - if ((in = MQTTProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - try { - version = reader.readLine(); - } catch (Exception e) { - } - } - BROKER_VERSION = version; - } private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final SessionId sessionId = new SessionId(connectionId, -1); private final ProducerId producerId = new ProducerId(sessionId, 1); - - private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); - private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); - private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); - private final ConcurrentHashMap subscriptions = new ConcurrentHashMap(); + private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); private final ConcurrentHashMap tempDestinations = new ConcurrentHashMap(); private final ConcurrentHashMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); - private final Map transactions = new ConcurrentHashMap(); private final Map activeMQTopicMap = new LRUCache(); private final Map mqttTopicMap = new LRUCache(); + private final Map consumerAcks = new LRUCache(); private final MQTTTransport mqttTransport; private final Object commnadIdMutex = new Object(); private int lastCommandId; private final AtomicBoolean connected = new AtomicBoolean(false); - private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); - private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); - private final BrokerContext brokerContext; - private String version = "1.0"; - ConnectionInfo connectionInfo = new ConnectionInfo(); + private ConnectionInfo connectionInfo = new ConnectionInfo(); private CONNECT connect; private String clientId; + private final String QOS_PROPERTY_NAME = "QoSPropertyName"; public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) { this.mqttTransport = mqttTransport; - this.brokerContext = brokerContext; } - protected int generateCommandId() { + int generateCommandId() { synchronized (commnadIdMutex) { return lastCommandId++; } } - protected void sendToActiveMQ(Command command, ResponseHandler handler) { + void sendToActiveMQ(Command command, ResponseHandler handler) { command.setCommandId(generateCommandId()); if (handler != null) { command.setResponseRequired(true); @@ -123,6 +96,14 @@ class MQTTProtocolConverter { mqttTransport.sendToActiveMQ(command); } + void sendToMQTT(MQTTFrame frame) { + try { + mqttTransport.sendToMQTT(frame); + } catch (IOException e) { + LOG.warn("Failed to send frame " + frame, e); + } + } + /** * Convert a MQTT command @@ -138,6 +119,7 @@ class MQTTProtocolConverter { } case CONNECT.TYPE: { onMQTTConnect(new CONNECT().decode(frame)); + LOG.debug("MQTT Client " + getClientId() + " connected."); break; } case DISCONNECT.TYPE: { @@ -145,6 +127,22 @@ class MQTTProtocolConverter { stopTransport(); break; } + case SUBSCRIBE.TYPE: { + onSubscribe(new SUBSCRIBE().decode(frame)); + break; + } + case UNSUBSCRIBE.TYPE: { + onUnSubscribe(new UNSUBSCRIBE().decode(frame)); + break; + } + case PUBLISH.TYPE: { + onMQTTPublish(new PUBLISH().decode(frame)); + break; + } + case PUBACK.TYPE: { + onMQTTPubAck(new PUBACK().decode(frame)); + break; + } default: handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); } @@ -152,7 +150,7 @@ class MQTTProtocolConverter { } - protected void onMQTTConnect(final CONNECT connect) throws ProtocolException { + void onMQTTConnect(final CONNECT connect) throws ProtocolException { if (connected.get()) { throw new ProtocolException("All ready connected."); @@ -171,15 +169,13 @@ class MQTTProtocolConverter { String passswd = ""; if (connect.password() != null) { passswd = connect.password().toString(); - } - configureInactivityMonitor(connect.keepAlive()); connectionInfo.setConnectionId(connectionId); - if (clientId != null && clientId.isEmpty() == false) { + if (clientId != null && !clientId.isEmpty()) { connectionInfo.setClientId(clientId); } else { connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); @@ -222,6 +218,7 @@ class MQTTProtocolConverter { CONNACK ack = new CONNACK(); ack.code(CONNACK.Code.CONNECTION_ACCEPTED); + connected.set(true); getMQTTTransport().sendToMQTT(ack.encode()); } @@ -231,13 +228,88 @@ class MQTTProtocolConverter { }); } + void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { + checkConnected(); + SUBACK result = new SUBACK(); + Topic[] topics = command.topics(); + if (topics != null) { + byte[] qos = new byte[topics.length]; + for (int i = 0; i < topics.length; i++) { + qos[i] = (byte) onSubscribe(command, topics[i]).ordinal(); + } + SUBACK ack = new SUBACK(); + ack.messageId(command.messageId()); + ack.grantedQos(qos); + try { + getMQTTTransport().sendToMQTT(ack.encode()); + } catch (IOException e) { + LOG.warn("Couldn't send SUBACK for " + command, e); + } + } else { + LOG.warn("No topics defined for Subscription " + command); + } + } + + QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException { + ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); + + + if (destination == null) { + throw new MQTTProtocolException("Invalid Destination."); + } + + ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + ConsumerInfo consumerInfo = new ConsumerInfo(id); + consumerInfo.setDestination(destination); + consumerInfo.setPrefetchSize(1000); + consumerInfo.setDispatchAsync(true); + if (!connect.cleanSession() && (connect.clientId() != null)) { + //by default subscribers are persistent + consumerInfo.setSubscriptionName(connect.clientId().toString()); + } + + MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo); + + + subscriptionsByConsumerId.put(id, mqttSubscription); + mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); + + sendToActiveMQ(consumerInfo, null); + return topic.qos(); + } + + void onUnSubscribe(UNSUBSCRIBE command) { + UTF8Buffer[] topics = command.topics(); + if (topics != null) { + for (int i = 0; i < topics.length; i++) { + onUnSubscribe(topics[i]); + } + } + UNSUBACK ack = new UNSUBACK(); + ack.messageId(command.messageId()); + sendToMQTT(ack.encode()); + + } + + void onUnSubscribe(UTF8Buffer topicName) { + MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName); + if (subs != null) { + ConsumerInfo info = subs.getConsumerInfo(); + if (info != null) { + subscriptionsByConsumerId.remove(info.getConsumerId()); + } + RemoveInfo removeInfo = info.createRemoveCommand(); + sendToActiveMQ(removeInfo, null); + } + } + /** * Dispatch a ActiveMQ command */ - public void onActiveMQCommand(Command command) throws IOException, JMSException { + public void onActiveMQCommand(Command command) throws Exception { if (command.isResponse()) { Response response = (Response) command; ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); @@ -252,28 +324,59 @@ class MQTTProtocolConverter { } } else if (command.isMessageDispatch()) { MessageDispatch md = (MessageDispatch) command; - StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); + MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); if (sub != null) { - //sub.onMessageDispatch(md); + MessageAck ack = sub.createMessageAck(md); + PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage()); + if (ack != null) { + synchronized (consumerAcks) { + consumerAcks.put(publish.messageId(), ack); + } + } + getMQTTTransport().sendToMQTT(publish.encode()); } } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { // Pass down any unexpected async errors. Should this close the connection? Throwable exception = ((ConnectionError) command).getException(); handleException(exception, null); + } else if (command.isBrokerInfo()) { + //ignore } else { LOG.debug("Do not know how to process ActiveMQ Command " + command); } } + void onMQTTPublish(PUBLISH command) throws IOException, JMSException { + checkConnected(); + ActiveMQMessage message = convertMessage(command); + message.setProducerId(producerId); + message.onSend(); + sendToActiveMQ(message, createResponseHandler(command)); + } - public ActiveMQMessage convertMessage(PUBLISH command) throws IOException, JMSException { + void onMQTTPubAck(PUBACK command) { + short messageId = command.messageId(); + MessageAck ack = null; + synchronized (consumerAcks) { + ack = consumerAcks.remove(messageId); + } + if (ack != null) { + getMQTTTransport().sendToActiveMQ(ack); + } + } + + + ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); - StringBuilder msgId = new StringBuilder(); - msgId.append("ID:").append(getClientId()).append(":").append(command.messageId()); - msg.setJMSMessageID(msgId.toString()); - msg.setJMSPriority(4); - //ActiveMQTopic topic = new ActiveMQTopic(topicName); + msg.setProducerId(producerId); + MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); + msg.setMessageId(id); + msg.setTimestamp(System.currentTimeMillis()); + msg.setPriority((byte) Message.DEFAULT_PRIORITY); + msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); + msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); + ActiveMQTopic topic = null; synchronized (activeMQTopicMap) { topic = activeMQTopicMap.get(command.topicName()); @@ -288,36 +391,48 @@ class MQTTProtocolConverter { return msg; } - public MQTTFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { + public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { PUBLISH result = new PUBLISH(); - String msgId = message.getJMSMessageID(); - int offset = msgId.lastIndexOf(':'); - - short id = 0; - if (offset > 0) { - Short.parseShort(msgId.substring(offset, msgId.length() - 1)); - } + short id = (short) message.getMessageId().getProducerSequenceId(); result.messageId(id); + QoS qoS; + if (message.propertyExists(QOS_PROPERTY_NAME)) { + int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); + qoS = QoS.values()[ordinal]; - UTF8Buffer topicName = null; + } else { + qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; + } + result.qos(qoS); + + UTF8Buffer topicName; synchronized (mqttTopicMap) { topicName = mqttTopicMap.get(message.getJMSDestination()); if (topicName == null) { - topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replaceAll(".", "/")); + topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replace('.', '/')); mqttTopicMap.put(message.getJMSDestination(), topicName); } } result.topicName(topicName); - if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { + ByteSequence byteSequence = message.getContent(); + if (message.isCompressed()) { + Inflater inflater = new Inflater(); + inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); + byte[] data = new byte[4096]; + int read; + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + while ((read = inflater.inflate(data, 0, data.length)) != 0) { + bytesOut.write(data, 0, read); + } + byteSequence = bytesOut.toByteSequence(); + } - if (!message.isCompressed() && message.getContent() != null) { - ByteSequence msgContent = message.getContent(); - if (msgContent.getLength() > 4) { - byte[] content = new byte[msgContent.getLength() - 4]; - System.arraycopy(msgContent.data, 4, content, 0, content.length); - result.payload(new Buffer(content)); - } + if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { + if (byteSequence.getLength() > 4) { + byte[] content = new byte[byteSequence.getLength() - 4]; + System.arraycopy(byteSequence.data, 4, content, 0, content.length); + result.payload(new Buffer(content)); } else { ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); String messageText = msg.getText(); @@ -334,9 +449,11 @@ class MQTTProtocolConverter { msg.readBytes(data); result.payload(new Buffer(data)); } else { - LOG.debug("Cannot convert " + message + " to a MQTT PUBLISH"); + if (byteSequence != null && byteSequence.getLength() > 0) { + result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); + } } - return result.encode(); + return result; } @@ -359,12 +476,8 @@ class MQTTProtocolConverter { return rc; } - public String getCreatedTempDestinationName(ActiveMQDestination destination) { - return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); - } - - protected void configureInactivityMonitor(short heartBeat) throws ProtocolException { + void configureInactivityMonitor(short heartBeat) { try { int heartBeatMS = heartBeat * 1000; @@ -372,18 +485,17 @@ class MQTTProtocolConverter { monitor.setReadCheckTime(heartBeatMS); monitor.setInitialDelayTime(heartBeatMS); - monitor.startMonitorThread(); } catch (Exception ex) { - + LOG.warn("Failed to start MQTT InactivityMonitor ", ex); } LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs"); } - protected void handleException(Throwable exception, MQTTFrame command) throws IOException { + void handleException(Throwable exception, MQTTFrame command) { LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Exception detail", exception); @@ -396,6 +508,12 @@ class MQTTProtocolConverter { } } + void checkConnected() throws MQTTProtocolException { + if (!connected.get()) { + throw new MQTTProtocolException("Not connected."); + } + } + private String getClientId() { if (clientId == null) { if (connect != null && connect.clientId() != null) { @@ -414,4 +532,67 @@ class MQTTProtocolConverter { LOG.debug("Failed to stop MQTT transport ", e); } } + + ResponseHandler createResponseHandler(final PUBLISH command) { + + if (command != null) { + switch (command.qos()) { + case AT_LEAST_ONCE: + return new ResponseHandler() { + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); + } else { + PUBACK ack = new PUBACK(); + ack.messageId(command.messageId()); + converter.getMQTTTransport().sendToMQTT(ack.encode()); + } + } + }; + case EXACTLY_ONCE: + return new ResponseHandler() { + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); + } else { + PUBACK ack = new PUBACK(); + ack.messageId(command.messageId()); + converter.getMQTTTransport().sendToMQTT(ack.encode()); + } + } + }; + case AT_MOST_ONCE: + break; + } + } + /* + final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + if (receiptId != null) { + return new ResponseHandler() { + public void onResponse(ProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + // Generally a command can fail.. but that does not invalidate the connection. + // We report back the failure but we don't close the connection. + Throwable exception = ((ExceptionResponse)response).getException(); + handleException(exception, command); + } else { + StompFrame sc = new StompFrame(); + sc.setAction(Stomp.Responses.RECEIPT); + sc.setHeaders(new HashMap(1)); + sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + stompTransport.sendToStomp(sc); + } + } + }; + } + */ + return null; + } + + private String convertMQTTToActiveMQ(String name) { + String result = name.replace('>', '#'); + result = result.replace('*', '+'); + result = result.replace('.', '/'); + return result; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java new file mode 100644 index 0000000000..1bb12f2773 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.SslTransportFactory; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * A MQTT over SSL transport factory + */ +public class MQTTSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { + + private BrokerContext brokerContext = null; + + protected String getDefaultWireFormatType() { + return "mqtt"; + } + + @SuppressWarnings("rawtypes") + + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new MQTTTransportFilter(transport, format, brokerContext); + IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); + } + + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + + public void setBrokerService(BrokerService brokerService) { + this.brokerContext = brokerService.getBrokerContext(); + } + + protected Transport createInactivityMonitor(Transport transport, WireFormat format) { + MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format); + + MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class); + filter.setInactivityMonitor(monitor); + + return monitor; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java index ba6776c158..16105268c9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java @@ -16,70 +16,40 @@ */ package org.apache.activemq.transport.mqtt; -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.LinkedList; - -import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageId; -import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.client.QoS; /** - * Keeps track of the STOMP subscription so that acking is correctly done. + * Keeps track of the MQTT client subscription so that acking is correctly done. */ -public class MQTTSubscription { +class MQTTSubscription { + private final MQTTProtocolConverter protocolConverter; + private final ConsumerInfo consumerInfo; + private ActiveMQDestination destination; + private final QoS qos; - protected final MQTTProtocolConverter protocolConverter; - protected final String subscriptionId; - protected final ConsumerInfo consumerInfo; - - protected final LinkedHashMap dispatchedMessage = new LinkedHashMap(); - protected final LinkedList unconsumedMessage = new LinkedList(); - - protected ActiveMQDestination destination; - protected String transformation; - - public MQTTSubscription(MQTTProtocolConverter protocolConverter, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { + public MQTTSubscription(MQTTProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) { this.protocolConverter = protocolConverter; - this.subscriptionId = subscriptionId; this.consumerInfo = consumerInfo; - this.transformation = transformation; + this.qos = qos; } - void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { - ActiveMQMessage message = (ActiveMQMessage) md.getMessage(); - /* - if (ackMode == CLIENT_ACK) { - synchronized (this) { - dispatchedMessage.put(message.getMessageId(), md); + MessageAck createMessageAck(MessageDispatch md) { + + switch (qos) { + case AT_MOST_ONCE: { + return null; } - } else if (ackMode == INDIVIDUAL_ACK) { - synchronized (this) { - dispatchedMessage.put(message.getMessageId(), md); - } - } else if (ackMode == AUTO_ACK) { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); - protocolConverter.getStompTransport().sendToActiveMQ(ack); + } - */ - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); - protocolConverter.getMQTTTransport().sendToActiveMQ(ack); - - MQTTFrame command = protocolConverter.convertMessage(message); - protocolConverter.getMQTTTransport().sendToMQTT(command); + return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); } - public String getSubscriptionId() { - return subscriptionId; - } - public void setDestination(ActiveMQDestination destination) { this.destination = destination; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java index c82e00e587..de50cf215c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java @@ -29,7 +29,7 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; /** - * A STOMP transport factory + * A MQTT transport factory */ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index 5093421c53..fe0aa92f35 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -36,9 +36,7 @@ import org.slf4j.LoggerFactory; * The StompTransportFilter normally sits on top of a TcpTransport that has been * configured with the StompWireFormat and is used to convert STOMP commands to * ActiveMQ commands. All of the conversion work is done by delegating to the - * MQTTProtocolConverter. - * - * @author chirino + * MQTTProtocolConverter */ public class MQTTTransportFilter extends TransportFilter implements MQTTTransport { private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class); @@ -62,7 +60,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor try { final Command command = (Command) o; protocolConverter.onActiveMQCommand(command); - } catch (JMSException e) { + } catch (Exception e) { throw IOExceptionSupport.create(e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java deleted file mode 100644 index 9c922ec36c..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -public class WildCardConvertor { - - static String convertActiveMQToMQTT(String name) { - String result = name.replaceAll("#", ">"); - result = result.replaceAll("+", "*"); - result = result.replaceAll("/", "."); - return result; - } - - static String convertMQTTToActiveMQ(String name) { - String result = name.replaceAll(">", "#"); - result = result.replaceAll("*", "+"); - result = result.replaceAll(".", "/"); - return result; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html index 4f7aed8072..3fafdaca5c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html @@ -19,7 +19,7 @@ -An implementation of the MQTT 3.1 protocol - see http://mqtt.org/ +A Broker side implementation of the MQTT 3.1 protocol - see http://mqtt.org/ diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl new file mode 100644 index 0000000000..91f6ab496b --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.mqtt.MQTTSslTransportFactory diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java index b5207f0c78..9578bc3a9e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java @@ -27,7 +27,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// https://issues.apache.org/jira/browse/AMQ-3393 + public class MQTTConnectTest { private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); BrokerService brokerService; @@ -54,7 +54,7 @@ public class MQTTConnectTest { brokerService.addConnector("mqtt://localhost:1883"); brokerService.start(); MQTT mqtt = new MQTT(); - mqtt.setHost("localhost",1883); + mqtt.setHost("localhost", 1883); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -62,5 +62,5 @@ public class MQTTConnectTest { connection.disconnect(); } - + } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java new file mode 100644 index 0000000000..9c79ffeccc --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Vector; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import org.apache.activemq.broker.BrokerService; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MQTTSSLConnectTest { + private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLConnectTest.class); + BrokerService brokerService; + Vector exceptions = new Vector(); + + @Before + public void startBroker() throws Exception { + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + + exceptions.clear(); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testConnect() throws Exception { + + brokerService.addConnector("mqtt+ssl://localhost:8883"); + brokerService.start(); + MQTT mqtt = new MQTT(); + mqtt.setHost("ssl://localhost:8883"); + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); + mqtt.setSslContext(ctx); + BlockingConnection connection = mqtt.blockingConnection(); + + connection.connect(); + Thread.sleep(1000); + connection.disconnect(); + } + + + private static class DefaultTrustManager implements X509TrustManager { + + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 9ba28dae5f..d1b9fb9728 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,1987 +16,102 @@ */ package org.apache.activemq.transport.mqtt; -import java.io.IOException; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.Vector; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerViewMBean; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.transport.stomp.SamplePojo; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompConnection; -import org.apache.activemq.transport.stomp.StompFrame; -import org.apache.activemq.util.Wait; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.junit.Assert.assertEquals; -public class MQTTTest extends CombinationTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); - protected String bindAddress = "mqtt://localhost:1883"; - protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; - protected String jmsUri = "vm://localhost"; +public class MQTTTest { + private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); + BrokerService brokerService; + Vector exceptions = new Vector(); - private BrokerService broker; - protected StompConnection stompConnection = new StompConnection(); - protected Connection connection; - protected Session session; - protected ActiveMQQueue queue; - private final String xmlObject = "\n" - + " Dejan\n" - + " Belgrade\n" - + ""; - - private String xmlMap = "\n" - + " \n" - + " name\n" - + " Dejan\n" - + " \n" - + " \n" - + " city\n" - + " Belgrade\n" - + " \n" - + "\n"; - - private final String jsonObject = "{\"pojo\":{" - + "\"name\":\"Dejan\"," - + "\"city\":\"Belgrade\"" - + "}}"; - - private String jsonMap = "{\"map\":{" - + "\"entry\":[" - + "{\"string\":[\"name\",\"Dejan\"]}," - + "{\"string\":[\"city\",\"Belgrade\"]}" - + "]" - + "}}"; - - @Override - protected void setUp() throws Exception { - // The order of the entries is different when using ibm jdk 5. - if (System.getProperty("java.vendor").equals("IBM Corporation") - && System.getProperty("java.version").startsWith("1.5")) { - xmlMap = "\n" - + " \n" - + " city\n" - + " Belgrade\n" - + " \n" - + " \n" - + " name\n" - + " Dejan\n" - + " \n" - + "\n"; - jsonMap = "{\"map\":{" - + "\"entry\":[" - + "{\"string\":[\"city\",\"Belgrade\"]}," - + "{\"string\":[\"name\",\"Dejan\"]}" - + "]" - + "}}"; - } - broker = BrokerFactory.createBroker(new URI(confUri)); - broker.setDeleteAllMessagesOnStartup(true); - broker.start(); - broker.waitUntilStarted(); - - stompConnect(); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri); - connection = cf.createConnection("system", "manager"); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - queue = new ActiveMQQueue(getQueueName()); - connection.start(); + @Before + public void startBroker() throws Exception { + exceptions.clear(); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); } - protected void stompConnect() throws IOException, URISyntaxException, UnknownHostException { - URI connectUri = new URI(bindAddress); - stompConnection.open(createSocket(connectUri)); - } - - private void stompConnect(StompConnection connection) throws IOException, URISyntaxException, UnknownHostException { - URI connectUri = new URI(bindAddress); - connection.open(createSocket(connectUri)); - } - - protected Socket createSocket(URI connectUri) throws IOException { - return new Socket("127.0.0.1", connectUri.getPort()); - } - - protected String getQueueName() { - return getClass().getName() + "." + getName(); - } - - @Override - protected void tearDown() throws Exception { - try { - connection.close(); - stompDisconnect(); - } catch(Exception e) { - // Some tests explicitly disconnect from stomp so can ignore - } finally { - broker.stop(); - broker.waitUntilStopped(); + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); } } - protected void stompDisconnect() throws IOException { - if (stompConnection != null) { - stompConnection.close(); - stompConnection = null; + @Test + public void testSendAndReceiveAtLeastOnce() throws Exception { + + brokerService.addConnector("mqtt://localhost:1883"); + brokerService.start(); + MQTT mqtt = new MQTT(); + mqtt.setHost("localhost", 1883); + BlockingConnection connection = mqtt.blockingConnection(); + + connection.connect(); + + Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; + connection.subscribe(topics); + for (int i = 0; i < 10000; i++) { + String payload = "Test Message: " + i; + connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Message message = connection.receive(); + message.ack(); + assertEquals(payload, new String(message.getPayload())); } + connection.disconnect(); } - public void sendMessage(String msg) throws Exception { - sendMessage(msg, "foo", "xyz"); - } - - public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException { - MessageProducer producer = session.createProducer(queue); - TextMessage message = session.createTextMessage(msg); - message.setStringProperty(propertyName, propertyValue); - producer.send(message); - } - - public void sendBytesMessage(byte[] msg) throws Exception { - MessageProducer producer = session.createProducer(queue); - BytesMessage message = session.createBytesMessage(); - message.writeBytes(msg); - producer.send(message); - } - - public void testConnect() throws Exception { - - String connectFrame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "request-id: 1\n" + "\n" + Stomp.NULL; - stompConnection.sendFrame(connectFrame); - - String f = stompConnection.receiveFrame(); - assertTrue(f.startsWith("CONNECTED")); - assertTrue(f.indexOf("response-id:1") >= 0); - - } - - public void testSendMessage() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertEquals("Hello World", message.getText()); - - // Make sure that the timestamp is valid - should - // be very close to the current time. - long tnow = System.currentTimeMillis(); - long tmsg = message.getJMSTimestamp(); - assertTrue(Math.abs(tnow - tmsg) < 1000); - } - - public void testJMSXGroupIdCanBeSet() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "JMSXGroupID: TEST\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID()); - } - - public void testSendMessageWithCustomHeadersAndSelector() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "foo:abc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertEquals("Hello World", message.getText()); - assertEquals("foo", "abc", message.getStringProperty("foo")); - assertEquals("bar", "123", message.getStringProperty("bar")); - } - - public void testSendMessageWithDelay() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "AMQ_SCHEDULED_DELAY:5000\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2000); - assertNull(message); - message = (TextMessage)consumer.receive(5000); - assertNotNull(message); - } - - public void testSendMessageWithStandardHeaders() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "correlation-id:c123\n" + "priority:3\n" + "type:t345\n" + "JMSXGroupID:abc\n" + "foo:abc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" - + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertEquals("Hello World", message.getText()); - assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID()); - assertEquals("getJMSType", "t345", message.getJMSType()); - assertEquals("getJMSPriority", 3, message.getJMSPriority()); - assertEquals("foo", "abc", message.getStringProperty("foo")); - assertEquals("bar", "123", message.getStringProperty("bar")); - - assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); - ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message; - assertEquals("GroupID", "abc", amqMessage.getGroupID()); - } - - public void testSendMessageWithNoPriorityReceivesDefault() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "correlation-id:c123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" - + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertEquals("Hello World", message.getText()); - assertEquals("getJMSPriority", 4, message.getJMSPriority()); - } - - public void testReceipts() throws Exception { - - StompConnection receiver = new StompConnection(); - URI connectUri = new URI(bindAddress); - receiver.open(createSocket(connectUri)); - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - receiver.sendFrame(frame); - - frame = receiver.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - receiver.sendFrame(frame); - - frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = receiver.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - assertTrue("Stomp Message does not contain receipt request", frame.indexOf(Stomp.Headers.RECEIPT_REQUESTED) == -1); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("RECEIPT")); - assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - receiver.sendFrame(frame); - - MessageConsumer consumer = session.createConsumer(queue); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("RECEIPT")); - assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); - - TextMessage message = (TextMessage)consumer.receive(10000); - assertNotNull(message); - assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED)); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - - public void testSubscriptionReceipts() throws Exception { - final int done = 500; - int count = 0; - int receiptId = 0; - - URI connectUri = new URI(bindAddress); - - do { - - StompConnection sender = new StompConnection(); - sender.open(createSocket(connectUri)); - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - sender.sendFrame(frame); - - frame = sender.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n" + "Hello World:" + (count++) + "\n\n" + Stomp.NULL; - sender.sendFrame(frame); - frame = sender.receiveFrame(); - assertTrue("" + frame, frame.startsWith("RECEIPT")); - - sender.disconnect(); - - StompConnection receiver = new StompConnection(); - receiver.open(createSocket(connectUri)); - - frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - receiver.sendFrame(frame); - - frame = receiver.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + Stomp.NULL; - receiver.sendFrame(frame); - - frame = receiver.receiveFrame(); - assertTrue("" + frame, frame.startsWith("RECEIPT")); - assertTrue("Receipt contains receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); - frame = receiver.receiveFrame(); - assertTrue("" + frame, frame.startsWith("MESSAGE")); - - // remove suscription so we don't hang about and get next message - frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + Stomp.NULL; - receiver.sendFrame(frame); - frame = receiver.receiveFrame(); - assertTrue("" + frame, frame.startsWith("RECEIPT")); - - receiver.disconnect(); - } while (count < done); - - } - - public void testSubscribeWithAutoAck() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - sendMessage(getName()); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSubscribeWithAutoAckAndBytesMessage() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - sendBytesMessage(new byte[] { - 1, 2, 3, 4, 5 - }); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - - Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); - Matcher clMmatcher = cl.matcher(frame); - assertTrue(clMmatcher.find()); - assertEquals("5", clMmatcher.group(1)); - - assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find()); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testBytesMessageWithNulls() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - StompFrame message = stompConnection.receive(); - assertTrue(message.getAction().startsWith("MESSAGE")); - - String length = message.getHeaders().get("content-length"); - assertEquals("5", length); - - assertEquals(5, message.getContent().length); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSendMultipleBytesMessages() throws Exception { - - final int MSG_COUNT = 50; - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - for( int ix = 0; ix < MSG_COUNT; ix++) { - frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL; - stompConnection.sendFrame(frame); + @Test + public void testSendMQTTReceiveJMS() throws Exception { + + brokerService.addConnector("mqtt://localhost:1883"); + brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); + brokerService.start(); + MQTT mqtt = new MQTT(); + mqtt.setHost("localhost", 1883); + BlockingConnection connection = mqtt.blockingConnection(); + final String DESTINATION_NAME = "foo"; + connection.connect(); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME); + MessageConsumer consumer = s.createConsumer(jmsTopic); + + for (int i = 0; i < 10000; i++) { + String payload = "Test Message: " + i; + connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); + ByteSequence bs = message.getContent(); + assertEquals(payload, new String(bs.data, bs.offset, bs.length)); } - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - for( int ix = 0; ix < MSG_COUNT; ix++) { - StompFrame message = stompConnection.receive(); - assertTrue(message.getAction().startsWith("MESSAGE")); - - String length = message.getHeaders().get("content-length"); - assertEquals("5", length); - - assertEquals(5, message.getContent().length); - } - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); + activeMQConnection.close(); + connection.disconnect(); } - public void testSubscribeWithMessageSentWithProperties() throws Exception { - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - MessageProducer producer = session.createProducer(queue); - TextMessage message = session.createTextMessage("Hello World"); - message.setStringProperty("s", "value"); - message.setBooleanProperty("n", false); - message.setByteProperty("byte", (byte)9); - message.setDoubleProperty("d", 2.0); - message.setFloatProperty("f", (float)6.0); - message.setIntProperty("i", 10); - message.setLongProperty("l", 121); - message.setShortProperty("s", (short)12); - producer.send(message); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testMessagesAreInOrder() throws Exception { - int ctr = 10; - String[] data = new String[ctr]; - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - for (int i = 0; i < ctr; ++i) { - data[i] = getName() + i; - sendMessage(data[i]); - } - - for (int i = 0; i < ctr; ++i) { - frame = stompConnection.receiveFrame(); - assertTrue("Message not in order", frame.indexOf(data[i]) >= 0); - } - - // sleep a while before publishing another set of messages - TimeUnit.SECONDS.sleep(2); - - for (int i = 0; i < ctr; ++i) { - data[i] = getName() + ":second:" + i; - sendMessage(data[i]); - } - - for (int i = 0; i < ctr; ++i) { - frame = stompConnection.receiveFrame(); - assertTrue("Message not in order", frame.indexOf(data[i]) >= 0); - } - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSubscribeWithAutoAckAndSelector() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 'zzz'\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - sendMessage("Ignored message", "foo", "1234"); - sendMessage("Real message", "foo", "zzz"); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSubscribeWithAutoAckAndNumericSelector() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 42\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // Ignored - frame = "SEND\n" + "foo:abc\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // Matches - frame = "SEND\n" + "foo:42\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSubscribeWithAutoAckAndBooleanSelector() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = true\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // Ignored - frame = "SEND\n" + "foo:false\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // Matches - frame = "SEND\n" + "foo:true\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSubscribeWithAutoAckAnFloatSelector() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 3.14159\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // Ignored - frame = "SEND\n" + "foo:6.578\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // Matches - frame = "SEND\n" + "foo:3.14159\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testSubscribeWithClientAck() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; - - stompConnection.sendFrame(frame); - sendMessage(getName()); - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - - stompDisconnect(); - - // message should be received since message was not acknowledged - MessageConsumer consumer = session.createConsumer(queue); - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertTrue(message.getJMSRedelivered()); - } - - public void testSubscribeWithClientAckedAndContentLength() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; - - stompConnection.sendFrame(frame); - sendMessage(getName()); - StompFrame msg = stompConnection.receive(); - - - assertTrue(msg.getAction().equals("MESSAGE")); - - HashMap ackHeaders = new HashMap(); - ackHeaders.put("message-id", msg.getHeaders().get("message-id")); - ackHeaders.put("content-length", "8511"); - - StompFrame ack = new StompFrame("ACK", ackHeaders); - stompConnection.sendFrame(ack.format()); - - final QueueViewMBean queueView = getProxyToQueue(getQueueName()); - assertTrue("dequeue complete", Wait.waitFor(new Wait.Condition(){ - @Override - public boolean isSatisified() throws Exception { - LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount()); - return queueView.getDequeueCount() == 1; - } - })); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - stompDisconnect(); - - // message should not be received since it was acknowledged - MessageConsumer consumer = session.createConsumer(queue); - TextMessage message = (TextMessage)consumer.receive(500); - assertNull(message); - } - - public void testUnsubscribe() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // send a message to our queue - sendMessage("first message"); - - // receive message from socket - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - - // remove suscription - frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt:1" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue("" + frame, frame.startsWith("RECEIPT")); - - // send a message to our queue - sendMessage("second message"); - - try { - frame = stompConnection.receiveFrame(); - LOG.info("Received frame: " + frame); - fail("No message should have been received since subscription was removed"); - } catch (SocketTimeoutException e) { - } - } - - public void testTransactionCommit() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - String f = stompConnection.receiveFrame(); - assertTrue(f.startsWith("CONNECTED")); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n\n" + "Hello World" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(10000); - assertNotNull("Should have received a message", message); - } - - public void testTransactionRollback() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - String f = stompConnection.receiveFrame(); - assertTrue(f.startsWith("CONNECTED")); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "first message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // rollback first message - frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - // only second msg should be received since first msg was rolled back - TextMessage message = (TextMessage)consumer.receive(10000); - assertNotNull(message); - assertEquals("second message", message.getText().trim()); - } - - public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception { - assertClients(1); - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - assertClients(2); - - // now lets kill the stomp connection - stompConnection.close(); - - assertClients(1); - } - - public void testConnectNotAuthenticatedWrongUser() throws Exception { - String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - String f = stompConnection.receiveFrame(); - - assertTrue(f.startsWith("ERROR")); - assertClients(1); - } - - public void testConnectNotAuthenticatedWrongPassword() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - String f = stompConnection.receiveFrame(); - - assertTrue(f.startsWith("ERROR")); - assertClients(1); - } - - public void testSendNotAuthorized() throws Exception { - - String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - String f = stompConnection.receiveFrame(); - assertTrue(f.startsWith("ERROR")); - } - - public void testSubscribeNotAuthorized() throws Exception { - - String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - - stompConnection.sendFrame(frame); - String f = stompConnection.receiveFrame(); - assertTrue(f.startsWith("ERROR")); - - } - - public void testTransformationUnknownTranslator() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:test" + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertEquals("Hello World", message.getText()); - } - - public void testTransformationFailed() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(2500); - assertNotNull(message); - assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR)); - assertEquals("Hello World", message.getText()); - } - - public void testTransformationSendXMLObject() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL; - - stompConnection.sendFrame(frame); - - ObjectMessage message = (ObjectMessage)consumer.receive(2500); - assertNotNull(message); - SamplePojo object = (SamplePojo)message.getObject(); - assertEquals("Dejan", object.getName()); - } - - public void testTransformationSendJSONObject() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL; - - stompConnection.sendFrame(frame); - - ObjectMessage message = (ObjectMessage)consumer.receive(2500); - assertNotNull(message); - SamplePojo object = (SamplePojo)message.getObject(); - assertEquals("Dejan", object.getName()); - } - - public void testTransformationSubscribeXML() throws Exception { - - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(xmlObject)); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationReceiveJSONObject() throws Exception { - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(jsonObject)); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationReceiveXMLObject() throws Exception { - - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(xmlObject)); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationReceiveObject() throws Exception { - - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(xmlObject)); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationReceiveXMLObjectAndMap() throws Exception { - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - producer.send(objMessage); - - MapMessage mapMessage = session.createMapMessage(); - mapMessage.setString("name", "Dejan"); - mapMessage.setString("city", "Belgrade"); - producer.send(mapMessage); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(xmlObject)); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(xmlMap.trim())); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationReceiveJSONObjectAndMap() throws Exception { - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - producer.send(objMessage); - - MapMessage mapMessage = session.createMapMessage(); - mapMessage.setString("name", "Dejan"); - mapMessage.setString("city", "Belgrade"); - producer.send(mapMessage); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(jsonObject)); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(jsonMap.trim())); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationSendAndReceiveXmlMap() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL; - - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertNotNull(frame); - assertTrue(frame.trim().endsWith(xmlMap.trim())); - assertTrue(frame.contains("jms-map-xml")); - } - - public void testTransformationSendAndReceiveJsonMap() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL; - - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertNotNull(frame); - assertTrue(frame.trim().endsWith(jsonMap.trim())); - assertTrue(frame.contains("jms-map-json")); - } - - public void testTransformationReceiveBytesMessage() throws Exception { - - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[]{1, 2, 3, 4, 5}); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); - - Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); - Matcher clMmatcher = cl.matcher(frame); - assertTrue(clMmatcher.find()); - assertEquals("5", clMmatcher.group(1)); - - assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find()); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationNotOverrideSubscription() throws Exception { - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString()); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(jsonObject)); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationIgnoreTransformation() throws Exception { - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); - message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString()); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.endsWith("\n\n")); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationSendXMLMap() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL; - - stompConnection.sendFrame(frame); - - MapMessage message = (MapMessage) consumer.receive(2500); - assertNotNull(message); - assertEquals(message.getString("name"), "Dejan"); - } - - public void testTransformationSendJSONMap() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL; - - stompConnection.sendFrame(frame); - - MapMessage message = (MapMessage) consumer.receive(2500); - assertNotNull(message); - assertEquals(message.getString("name"), "Dejan"); - } - - public void testTransformationReceiveXMLMap() throws Exception { - - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - MapMessage message = session.createMapMessage(); - message.setString("name", "Dejan"); - message.setString("city", "Belgrade"); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(xmlMap.trim())); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransformationReceiveJSONMap() throws Exception { - - MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); - MapMessage message = session.createMapMessage(); - message.setString("name", "Dejan"); - message.setString("city", "Belgrade"); - producer.send(message); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - - assertTrue(frame.trim().endsWith(jsonMap.trim())); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testDurableUnsub() throws Exception { - // get broker JMX view - - String domain = "org.apache.activemq"; - ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); - - BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); - - // connect - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - assertEquals(view.getDurableTopicSubscribers().length, 0); - - // subscribe - frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - // wait a bit for MBean to get refreshed - try { - Thread.sleep(400); - } catch (InterruptedException e){} - - assertEquals(view.getDurableTopicSubscribers().length, 1); - // disconnect - frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - try { - Thread.sleep(400); - } catch (InterruptedException e){} - - //reconnect - stompConnect(); - // connect - frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - // unsubscribe - frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - try { - Thread.sleep(400); - } catch (InterruptedException e){} - assertEquals(view.getDurableTopicSubscribers().length, 0); - } - - public void testMessageIdHeader() throws Exception { - stompConnection.connect("system", "manager"); - - stompConnection.begin("tx1"); - stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null); - stompConnection.commit("tx1"); - - stompConnection.subscribe("/queue/" + getQueueName()); - StompFrame stompMessage = stompConnection.receive(); - assertNull(stompMessage.getHeaders().get("transaction")); - } - - public void testPrefetchSize() throws Exception { - stompConnection.connect("system", "manager"); - - HashMap headers = new HashMap(); - headers.put("activemq.prefetchSize", "1"); - stompConnection.subscribe("/queue/" + getQueueName(), "client", headers); - - // send messages using JMS - sendMessage("message 1"); - sendMessage("message 2"); - sendMessage("message 3"); - sendMessage("message 4"); - sendMessage("message 5"); - - StompFrame frame = stompConnection.receive(); - assertEquals(frame.getBody(), "message 1"); - - stompConnection.begin("tx1"); - stompConnection.ack(frame, "tx1"); - - StompFrame frame1 = stompConnection.receive(); - assertEquals(frame1.getBody(), "message 2"); - - try { - StompFrame frame2 = stompConnection.receive(500); - if (frame2 != null) { - fail("Should not have received the second message"); - } - } catch (SocketTimeoutException soe) {} - - stompConnection.ack(frame1, "tx1"); - Thread.sleep(1000); - stompConnection.abort("tx1"); - - stompConnection.begin("tx2"); - - // Previously delivered message need to get re-acked... - stompConnection.ack(frame, "tx2"); - stompConnection.ack(frame1, "tx2"); - - StompFrame frame3 = stompConnection.receive(); - assertEquals(frame3.getBody(), "message 3"); - stompConnection.ack(frame3, "tx2"); - - StompFrame frame4 = stompConnection.receive(); - assertEquals(frame4.getBody(), "message 4"); - stompConnection.ack(frame4, "tx2"); - - stompConnection.commit("tx2"); - - stompConnection.begin("tx3"); - StompFrame frame5 = stompConnection.receive(); - assertEquals(frame5.getBody(), "message 5"); - stompConnection.ack(frame5, "tx3"); - stompConnection.commit("tx3"); - - stompDisconnect(); - } - - public void testTransactionsWithMultipleDestinations() throws Exception { - - stompConnection.connect("system", "manager"); - - HashMap headers = new HashMap(); - headers.put("activemq.prefetchSize", "1"); - headers.put("activemq.exclusive", "true"); - - stompConnection.subscribe("/queue/test1", "client", headers); - - stompConnection.begin("ID:tx1"); - - headers.clear(); - headers.put("receipt", "ID:msg1"); - stompConnection.send("/queue/test2", "test message", "ID:tx1", headers); - - stompConnection.commit("ID:tx1"); - - // make sure connection is active after commit - Thread.sleep(1000); - stompConnection.send("/queue/test1", "another message"); - - StompFrame frame = stompConnection.receive(500); - assertNotNull(frame); - - stompConnection.disconnect(); - } - - public void testTempDestination() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/temp-queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - stompConnection.sendFrame(frame); - - StompFrame message = stompConnection.receive(1000); - assertEquals("Hello World", message.getBody()); - } - - public void testJMSXUserIDIsSetInMessage() throws Exception { - - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - - stompConnection.sendFrame(frame); - - TextMessage message = (TextMessage)consumer.receive(5000); - assertNotNull(message); - assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID)); - } - - public void testJMSXUserIDIsSetInStompMessage() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - stompConnection.sendFrame(frame); - - StompFrame message = stompConnection.receive(5000); - assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID)); - } - - public void testClientSetMessageIdIsIgnored() throws Exception { - HashMap headers = new HashMap(); - headers.put(Stomp.Headers.Message.MESSAGE_ID, "Thisisnotallowed"); - headers.put(Stomp.Headers.Message.TIMESTAMP, "1234"); - headers.put(Stomp.Headers.Message.REDELIVERED, "true"); - headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed"); - headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed"); - - stompConnection.connect("system", "manager"); - - stompConnection.send("/queue/" + getQueueName(), "msg", null, headers); - - stompConnection.subscribe("/queue/" + getQueueName()); - StompFrame stompMessage = stompConnection.receive(); - - Map mess_headers = new HashMap(); - mess_headers = stompMessage.getHeaders(); - - assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID) - )); - assertFalse("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP))); - assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED)); - assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION)); - assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID)); - } - - public void testExpire() throws Exception { - stompConnection.connect("system", "manager"); - - HashMap headers = new HashMap(); - long timestamp = System.currentTimeMillis(); - headers.put(Stomp.Headers.Message.EXPIRATION_TIME, String.valueOf(timestamp)); - headers.put(Stomp.Headers.Send.PERSISTENT, "true"); - - stompConnection.send("/queue/" + getQueueName(), "msg", null, headers); - - stompConnection.subscribe("/queue/ActiveMQ.DLQ"); - StompFrame stompMessage = stompConnection.receive(1000); - assertNotNull(stompMessage); - assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.ORIGINAL_DESTINATION), "/queue/" + getQueueName()); - } - - public void testDefaultJMSReplyToDest() throws Exception { - stompConnection.connect("system", "manager"); - - HashMap headers = new HashMap(); - headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString"); - headers.put(Stomp.Headers.Send.PERSISTENT, "true"); - - stompConnection.send("/queue/" + getQueueName(), "msg-with-reply-to", null, headers); - - stompConnection.subscribe("/queue/" + getQueueName()); - StompFrame stompMessage = stompConnection.receive(1000); - assertNotNull(stompMessage); - assertEquals("" + stompMessage, stompMessage.getHeaders().get(Stomp.Headers.Send.REPLY_TO), "JustAString"); - } - - public void testPersistent() throws Exception { - stompConnection.connect("system", "manager"); - - HashMap headers = new HashMap(); - headers.put(Stomp.Headers.Message.PERSISTENT, "true"); - - stompConnection.send("/queue/" + getQueueName(), "hello", null, headers); - - stompConnection.subscribe("/queue/" + getQueueName()); - - StompFrame stompMessage = stompConnection.receive(); - assertNotNull(stompMessage); - assertNotNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT)); - assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT), "true"); - } - - public void testPersistentDefaultValue() throws Exception { - stompConnection.connect("system", "manager"); - - HashMap headers = new HashMap(); - - stompConnection.send("/queue/" + getQueueName(), "hello", null, headers); - - stompConnection.subscribe("/queue/" + getQueueName()); - - StompFrame stompMessage = stompConnection.receive(); - assertNotNull(stompMessage); - assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT)); - } - - public void testReceiptNewQueue() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - StompFrame receipt = stompConnection.receive(); - assertTrue(receipt.getAction().startsWith("RECEIPT")); - assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id")); - - frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - receipt = stompConnection.receive(); - assertTrue(receipt.getAction().startsWith("RECEIPT")); - assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id")); - - StompFrame message = stompConnection.receive(); - assertTrue(message.getAction().startsWith("MESSAGE")); - - String length = message.getHeaders().get("content-length"); - assertEquals("0", length); - assertEquals(0, message.getContent().length); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - public void testTransactedClientAckBrokerStats() throws Exception { - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - sendMessage(getName()); - sendMessage(getName()); - - stompConnection.begin("tx1"); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - StompFrame message = stompConnection.receive(); - assertTrue(message.getAction().equals("MESSAGE")); - stompConnection.ack(message, "tx1"); - - message = stompConnection.receive(); - assertTrue(message.getAction().equals("MESSAGE")); - stompConnection.ack(message, "tx1"); - - stompConnection.commit("tx1"); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - final QueueViewMBean queueView = getProxyToQueue(getQueueName()); - Wait.waitFor(new Wait.Condition(){ - @Override - public boolean isSatisified() throws Exception { - return queueView.getDequeueCount() == 2; - } - }); - assertEquals(2, queueView.getDispatchCount()); - assertEquals(2, queueView.getDequeueCount()); - assertEquals(0, queueView.getQueueSize()); - } - - public void testReplytoModification() throws Exception { - String replyto = "some destination"; - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "reply-to:" + replyto + "\n\nhello world" + Stomp.NULL; - stompConnection.sendFrame(frame); - - StompFrame message = stompConnection.receive(); - assertTrue(message.getAction().equals("MESSAGE")); - assertEquals(replyto, message.getHeaders().get("reply-to")); - - stompConnection.sendFrame("DISCONNECT\n" + "\n\n" + Stomp.NULL); - } - - public void testReplyToDestinationNaming() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - doTestActiveMQReplyToTempDestination("topic"); - doTestActiveMQReplyToTempDestination("queue"); - } - - public void testSendNullBodyTextMessage() throws Exception { - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - sendMessage(null); - frame = stompConnection.receiveFrame(); - assertNotNull("Message not received", frame); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - } - - private void doTestActiveMQReplyToTempDestination(String type) throws Exception { - LOG.info("Starting test on Temp Destinations using a temporary: " + type); - - final String dest = "/" + type + "/" + getQueueName(); - final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1", type); - LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest); - - // Subscribe both to the out-bound destination and the response tempt destination - stompConnection.subscribe(dest); - stompConnection.subscribe(tempDest); - - // Send a Message with the ReplyTo value set. - HashMap properties = new HashMap(); - properties.put(Stomp.Headers.Send.REPLY_TO, tempDest); - LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest)); - stompConnection.send(dest, "REQUEST", null, properties); - - // The subscription should receive a response with the ReplyTo property set. - StompFrame received = stompConnection.receive(); - assertNotNull(received); - String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO); - assertNotNull(remoteReplyTo); - assertTrue(remoteReplyTo.startsWith(String.format("/temp-%s/", type))); - LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo)); - - // Reply to the request using the given ReplyTo destination - stompConnection.send(remoteReplyTo, "RESPONSE"); - - // The response should be received by the Temporary Destination subscription - StompFrame reply = stompConnection.receive(); - assertNotNull(reply); - assertEquals("MESSAGE", reply.getAction()); - LOG.info(String.format("Response %s received", reply.getAction())); - - BrokerViewMBean broker = getProxyToBroker(); - if (type.equals("topic")) { - assertEquals(1, broker.getTemporaryTopics().length); - } else { - assertEquals(1, broker.getTemporaryQueues().length); - } - } - - public void testReplyToAcrossConnections() throws Exception { - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - doReplyToAcrossConnections("topic"); - doReplyToAcrossConnections("queue"); - } - - private void doReplyToAcrossConnections(String type) throws Exception { - LOG.info("Starting test on Temp Destinations using a temporary: " + type); - - StompConnection responder = new StompConnection(); - stompConnect(responder); - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - responder.sendFrame(frame); - - frame = responder.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - final String dest = "/" + type + "/" + getQueueName(); - final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1:1:0:1", type); - LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest); - - // Subscribe to the temp destination, this is where we get our response. - stompConnection.subscribe(tempDest); - - // Subscribe to the Queue, this is where we get our request. - responder.subscribe(dest); - - // Send a Message with the ReplyTo value set. - HashMap properties = new HashMap(); - properties.put(Stomp.Headers.Send.REPLY_TO, tempDest); - properties.put(Stomp.Headers.RECEIPT_REQUESTED, "send-1"); - LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest)); - stompConnection.send(dest, "REQUEST", null, properties); - - frame = stompConnection.receiveFrame(); - assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT")); - assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); - - - // The subscription should receive a response with the ReplyTo property set. - StompFrame received = responder.receive(); - assertNotNull(received); - String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO); - assertNotNull(remoteReplyTo); - assertTrue(remoteReplyTo.startsWith(String.format("/remote-temp-%s/", type))); - LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo)); - - // Reply to the request using the given ReplyTo destination - responder.send(remoteReplyTo, "RESPONSE"); - - // The response should be received by the Temporary Destination subscription - StompFrame reply = stompConnection.receive(); - assertNotNull(reply); - assertEquals("MESSAGE", reply.getAction()); - assertTrue(reply.getBody().contains("RESPONSE")); - LOG.info(String.format("Response %s received", reply.getAction())); - - BrokerViewMBean broker = getProxyToBroker(); - if (type.equals("topic")) { - assertEquals(1, broker.getTemporaryTopics().length); - } else { - assertEquals(1, broker.getTemporaryQueues().length); - } - } - - private BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException { - ObjectName brokerViewMBean = new ObjectName( - "org.apache.activemq:Type=Broker,BrokerName=localhost"); - BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext() - .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true); - return proxy; - } - - private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" - + ":Type=Queue,Destination=" + name - + ",BrokerName=localhost"); - QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() - .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } - - protected void assertClients(final int expected) throws Exception { - Wait.waitFor(new Wait.Condition() - { - @Override - public boolean isSatisified() throws Exception { - return broker.getBroker().getClients().length == expected; - } - }); - org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); - int actual = clients.length; - - assertEquals("Number of clients", expected, actual); - } - - public void testDisconnectDoesNotDeadlockBroker() throws Exception { - for (int i = 0; i < 20; ++i) { - doTestConnectionLeak(); - } - } - - private void doTestConnectionLeak() throws Exception { - stompConnect(); - - String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("CONNECTED")); - - boolean gotMessage = false; - boolean gotReceipt = false; - - char[] payload = new char[1024]; - Arrays.fill(payload, 'A'); - - String test = "SEND\n" + - "x-type:DEV-3485\n" + - "x-uuid:" + UUID.randomUUID() + "\n" + - "persistent:true\n" + - "receipt:" + UUID.randomUUID() + "\n" + - "destination:/queue/test.DEV-3485" + - "\n\n" + - new String(payload) + Stomp.NULL; - - frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - stompConnection.sendFrame(test); - - // We only want one of them, to trigger the shutdown and potentially - // see a deadlock. - while (!gotMessage && !gotReceipt) { - frame = stompConnection.receiveFrame(); - - LOG.debug("Received the frame: " + frame); - - if (frame.startsWith("RECEIPT")) { - gotReceipt = true; - } else if(frame.startsWith("MESSAGE")) { - gotMessage = true; - } else { - fail("Received a frame that we were not expecting."); - } - } - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); - - stompConnection.close(); - } -} +} \ No newline at end of file