diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java index 54894fce53..1d826a44e6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java @@ -41,7 +41,6 @@ import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; */ public class LegacyFrameTranslator implements FrameTranslator { - public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { final Map headers = command.getHeaders(); final ActiveMQMessage msg; @@ -59,6 +58,7 @@ public class LegacyFrameTranslator implements FrameTranslator { data.writeInt(command.getContent().length); data.write(command.getContent()); text.setContent(bytes.toByteSequence()); + data.close(); } catch (Throwable e) { throw new ProtocolException("Text could not bet set: " + e, false, e); } @@ -83,6 +83,7 @@ public class LegacyFrameTranslator implements FrameTranslator { data.writeInt(command.getContent().length); data.write(command.getContent()); text.setContent(bytes.toByteSequence()); + data.close(); } catch (Throwable e) { throw new ProtocolException("Text could not bet set: " + e, false, e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 741ca1c9d1..1da70cad24 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -110,6 +110,9 @@ public class ProtocolConverter { private final Map transactions = new ConcurrentHashMap(); private final StompTransport stompTransport; + private final ConcurrentHashMap pedingAcks = new ConcurrentHashMap(); + private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); + private final Object commnadIdMutex = new Object(); private int lastCommandId; private final AtomicBoolean connected = new AtomicBoolean(false); @@ -121,6 +124,33 @@ public class ProtocolConverter { private long hbWriteInterval; private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; + private static class AckEntry { + + private String messageId; + private StompSubscription subscription; + + public AckEntry(String messageId, StompSubscription subscription) { + this.messageId = messageId; + this.subscription = subscription; + } + + public MessageAck onMessageAck(TransactionId transactionId) { + return subscription.onStompMessageAck(messageId, transactionId); + } + + public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { + return subscription.onStompMessageNack(messageId, transactionId); + } + + public String getMessageId() { + return this.messageId; + } + + public StompSubscription getSubscription() { + return this.subscription; + } + } + public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { this.stompTransport = stompTransport; this.brokerContext = brokerContext; @@ -301,15 +331,20 @@ public class ProtocolConverter { Map headers = command.getHeaders(); String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); - if (subscriptionId == null) { + if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) { throw new ProtocolException("NACK received without a subscription id for acknowledge!"); } String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); - if (messageId == null) { + if (messageId == null && !this.version.equals(Stomp.V1_2)) { throw new ProtocolException("NACK received without a message-id to acknowledge!"); } + String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); + if (ackId == null && this.version.equals(Stomp.V1_2)) { + throw new ProtocolException("NACK received without an ack header to acknowledge!"); + } + TransactionId activemqTx = null; String stompTx = headers.get(Stomp.Headers.TRANSACTION); if (stompTx != null) { @@ -319,17 +354,32 @@ public class ProtocolConverter { } } - if (subscriptionId != null) { + boolean nacked = false; + + if (ackId != null) { + AckEntry pendingAck = this.pedingAcks.get(ackId); + if (pendingAck != null) { + messageId = pendingAck.getMessageId(); + MessageAck ack = pendingAck.onMessageNack(activemqTx); + if (ack != null) { + sendToActiveMQ(ack, createResponseHandler(command)); + nacked = true; + } + } + } else if (subscriptionId != null) { StompSubscription sub = this.subscriptions.get(subscriptionId); if (sub != null) { MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); if (ack != null) { sendToActiveMQ(ack, createResponseHandler(command)); - } else { - throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); + nacked = true; } } } + + if (!nacked) { + throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); + } } protected void onStompAck(StompFrame command) throws ProtocolException { @@ -337,15 +387,20 @@ public class ProtocolConverter { Map headers = command.getHeaders(); String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); - if (messageId == null) { + if (messageId == null && !(this.version.equals(Stomp.V1_2))) { throw new ProtocolException("ACK received without a message-id to acknowledge!"); } String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); - if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { + if (subscriptionId == null && this.version.equals(Stomp.V1_1)) { throw new ProtocolException("ACK received without a subscription id for acknowledge!"); } + String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); + if (ackId == null && this.version.equals(Stomp.V1_2)) { + throw new ProtocolException("ACK received without a ack id for acknowledge!"); + } + TransactionId activemqTx = null; String stompTx = headers.get(Stomp.Headers.TRANSACTION); if (stompTx != null) { @@ -357,7 +412,19 @@ public class ProtocolConverter { boolean acked = false; - if (subscriptionId != null) { + if (ackId != null) { + + AckEntry pendingAck = this.pedingAcks.get(ackId); + if (pendingAck != null) { + messageId = pendingAck.getMessageId(); + MessageAck ack = pendingAck.onMessageAck(activemqTx); + if (ack != null) { + sendToActiveMQ(ack, createResponseHandler(command)); + acked = true; + } + } + + } else if (subscriptionId != null) { StompSubscription sub = this.subscriptions.get(subscriptionId); if (sub != null) { @@ -370,7 +437,7 @@ public class ProtocolConverter { } else { - // TODO: acking with just a message id is very bogus since the same message id + // STOMP v1.0: acking with just a message id is very bogus since the same message id // could have been sent to 2 different subscriptions on the same Stomp connection. // For example, when 2 subs are created on the same topic. @@ -505,8 +572,8 @@ public class ProtocolConverter { } String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); - if( selector!=null ) { - consumerInfo.setSelector("convert_string_expressions:"+selector); + if (selector != null) { + consumerInfo.setSelector("convert_string_expressions:" + selector); } IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); @@ -606,6 +673,7 @@ public class ProtocolConverter { if (this.version.equals(Stomp.V1_1)) { clientId = connectionInfo.getClientId(); } + if (durable != null) { RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(clientId); @@ -733,7 +801,6 @@ public class ProtocolConverter { } } }); - } }); } @@ -775,7 +842,19 @@ public class ProtocolConverter { MessageDispatch md = (MessageDispatch)command; StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); if (sub != null) { - sub.onMessageDispatch(md); + String ackId = null; + if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO) { + AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub); + ackId = this.ACK_ID_GENERATOR.generateId(); + this.pedingAcks.put(ackId, pendingAck); + } + try { + sub.onMessageDispatch(md, ackId); + } catch (Exception ex) { + if (ackId != null) { + this.pedingAcks.remove(ackId); + } + } } } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { stompTransport.sendToStomp(ping); @@ -846,15 +925,11 @@ public class ProtocolConverter { } try { - StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); - monitor.setReadCheckTime(hbReadInterval); monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); monitor.setWriteCheckTime(hbWriteInterval); - monitor.startMonitoring(); - } catch(Exception ex) { hbReadInterval = 0; hbWriteInterval = 0; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index c726220a18..a66b5eecdc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -32,11 +32,12 @@ public interface Stomp { String COMMA = ","; String V1_0 = "1.0"; String V1_1 = "1.1"; + String V1_2 = "1.2"; String DEFAULT_HEART_BEAT = "0,0"; String DEFAULT_VERSION = "1.0"; String EMPTY = ""; - String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"}; + String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.2", "1.1", "1.0"}; String TEXT_PLAIN = "text/plain"; String TRUE = "true"; @@ -100,6 +101,7 @@ public interface Stomp { public interface Message { String MESSAGE_ID = "message-id"; + String ACK_ID = "ack"; String DESTINATION = "destination"; String CORRELATION_ID = "correlation-id"; String EXPIRATION_TIME = "expires"; @@ -159,6 +161,7 @@ public interface Stomp { public interface Ack { String MESSAGE_ID = "message-id"; String SUBSCRIPTION = "subscription"; + String ACK_ID = "id"; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java index 7b2a429d2a..9e267acb2f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java @@ -32,10 +32,10 @@ public class StompQueueBrowserSubscription extends StompSubscription { } @Override - void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { + void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException { if (md.getMessage() != null) { - super.onMessageDispatch(md); + super.onMessageDispatch(md, ackId); } else { StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE); browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 5f2f0a930c..714bce914b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.transport.stomp; -import org.apache.activemq.command.*; - -import javax.jms.JMSException; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; @@ -26,6 +23,17 @@ import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.TransactionId; + /** * Keeps track of the STOMP subscription so that acking is correctly done. * @@ -55,7 +63,7 @@ public class StompSubscription { this.transformation = transformation; } - void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { + void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException { ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); if (ackMode == CLIENT_ACK) { synchronized (this) { @@ -73,7 +81,7 @@ public class StompSubscription { boolean ignoreTransformation = false; if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) { - message.setReadOnlyProperties(false); + message.setReadOnlyProperties(false); message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); } else { if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) { @@ -88,6 +96,10 @@ public class StompSubscription { command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); } + if (ackId != null) { + command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId); + } + protocolConverter.getStompTransport().sendToStomp(command); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index ac83a95a33..1d38bdb0a2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -154,12 +154,23 @@ public class StompWireFormat implements WireFormat { ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength); while ((b = in.readByte()) != '\n') { if (baos.size() > maxLength) { + baos.close(); throw new ProtocolException(errorMessage, true); } baos.write(b); } + baos.close(); - return baos.toByteSequence(); + ByteSequence line = baos.toByteSequence(); + + if (stompVersion.equals(Stomp.V1_0) || stompVersion.equals(Stomp.V1_2)) { + int lineLength = line.getLength(); + if (lineLength > 0 && line.data[lineLength-1] == '\r') { + line.setLength(lineLength-1); + } + } + + return line; } protected String parseAction(DataInput in) throws IOException { @@ -177,6 +188,7 @@ public class StompWireFormat implements WireFormat { } } } + return action; } @@ -206,6 +218,7 @@ public class StompWireFormat implements WireFormat { } ByteSequence nameSeq = stream.toByteSequence(); + String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8"); String value = decodeHeader(headerLine); if (stompVersion.equals(Stomp.V1_0)) { @@ -213,8 +226,11 @@ public class StompWireFormat implements WireFormat { } if (!headers.containsKey(name)) { - headers.put(name, value); + headers.put(name, value); } + + stream.close(); + } catch (Exception e) { throw new ProtocolException("Unable to parser header line [" + line + "]", true); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java new file mode 100644 index 0000000000..7dbf51d550 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; + +public class Stomp12NIOSSLTest extends Stomp12Test { + + protected void setUp() throws Exception { + bindAddress = "stomp+nio+ssl://localhost:61613"; + confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml"; + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + super.setUp(); + } + + protected Socket createSocket(URI connectUri) throws IOException { + SocketFactory factory = SSLSocketFactory.getDefault(); + return factory.createSocket("127.0.0.1", connectUri.getPort()); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java new file mode 100644 index 0000000000..4b7dc9484c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +public class Stomp12NIOTest extends Stomp12Test { + + @Override + protected void setUp() throws Exception { + bindAddress = "stomp+nio://localhost:61612"; + confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml"; + super.setUp(); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java new file mode 100644 index 0000000000..70702e637a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; + + + +/** + * + */ +public class Stomp12SslAuthTest extends Stomp12Test { + + + protected void setUp() throws Exception { + + // Test mutual authentication on both stomp and standard ssl transports + bindAddress = "stomp+ssl://localhost:61612"; + confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml"; + jmsUri="ssl://localhost:61617"; + + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + //System.setProperty("javax.net.debug","ssl,handshake"); + super.setUp(); + } + + protected Socket createSocket(URI connectUri) throws IOException { + SocketFactory factory = SSLSocketFactory.getDefault(); + return factory.createSocket("127.0.0.1", connectUri.getPort()); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java new file mode 100644 index 0000000000..e401d48d49 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +import javax.jms.Connection; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Stomp12Test extends CombinationTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(Stomp12Test.class); + + protected String bindAddress = "stomp://localhost:61613"; + protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; + protected String jmsUri = "vm://localhost"; + + private BrokerService broker; + private StompConnection stompConnection = new StompConnection(); + private Connection connection; + + @Override + protected void setUp() throws Exception { + + broker = BrokerFactory.createBroker(new URI(confUri)); + broker.start(); + broker.waitUntilStarted(); + + stompConnect(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri); + connection = cf.createConnection("system", "manager"); + connection.start(); + } + + private void stompConnect() throws IOException, URISyntaxException, UnknownHostException { + URI connectUri = new URI(bindAddress); + stompConnection.open(createSocket(connectUri)); + } + + protected Socket createSocket(URI connectUri) throws IOException { + return new Socket("127.0.0.1", connectUri.getPort()); + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + + @Override + protected void tearDown() throws Exception { + try { + stompDisconnect(); + } catch(Exception e) { + // Some tests explicitly disconnect from stomp so can ignore + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } + + private void stompDisconnect() throws IOException { + if (stompConnection != null) { + stompConnection.close(); + stompConnection = null; + } + } + + @Test + public void testTelnetStyleSends() throws Exception { + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "CONNECT\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String send = "SUBSCRIBE\r\n" + + "id:1\r\n" + + "destination:/queue/" + getQueueName() + "\r\n" + + "receipt:1\r\n" + + "\r\n"+ + "\u0000\r\n"; + + stompConnection.sendFrame(send); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(disconnect); + } + + @Test + public void testClientAckWithoutAckId() throws Exception { + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "ack:client\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL; + stompConnection.sendFrame(message); + + StompFrame received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("1", received.getBody()); + + String frame = "ACK\n" + "message-id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + received = stompConnection.receive(); + assertTrue(received.getAction().equals("ERROR")); + LOG.info("Broker sent: " + received); + + String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(disconnect); + } + + @Test + public void testClientAck() throws Exception { + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "ack:client\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL; + stompConnection.sendFrame(message); + message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL; + stompConnection.sendFrame(message); + + StompFrame received = stompConnection.receive(); + LOG.info("Stomp Message: {}", received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("1", received.getBody()); + + received = stompConnection.receive(); + LOG.info("Stomp Message: {}", received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("2", received.getBody()); + + String frame = "ACK\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "DISCONNECT\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + try { + Thread.sleep(400); + } catch (InterruptedException e){} + + // reconnect and send some messages to the offline subscribers and then try to get + // them after subscribing again. + stompConnect(); + stompConnection.sendFrame(connect); + frame = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + frame); + assertTrue(frame.startsWith("CONNECTED")); + + stompConnection.sendFrame(subscribe); + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL; + stompConnection.sendFrame(message); + + received = stompConnection.receive(); + LOG.info("Stomp Message: {}", received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("3", received.getBody()); + + frame = "ACK\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(disconnect); + } + + @Test + public void testClientIndividualAck() throws Exception { + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "ack:client-individual\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL; + stompConnection.sendFrame(message); + message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL; + stompConnection.sendFrame(message); + + StompFrame received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("1", received.getBody()); + + received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("2", received.getBody()); + + String frame = "ACK\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + try { + Thread.sleep(400); + } catch (InterruptedException e){} + + // reconnect and send some messages to the offline subscribers and then try to get + // them after subscribing again. + stompConnect(); + stompConnection.sendFrame(connect); + frame = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + frame); + assertTrue(frame.startsWith("CONNECTED")); + + stompConnection.sendFrame(subscribe); + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL; + stompConnection.sendFrame(message); + + received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("1", received.getBody()); + + frame = "ACK\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals("3", received.getBody()); + + frame = "ACK\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(disconnect); + } + +}