fix for: https://issues.apache.org/jira/browse/AMQ-4129 Adds support for the STOMP v1.2 spec changes.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1403869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-10-30 21:09:15 +00:00
parent 2f258702f1
commit 6d08aca024
10 changed files with 649 additions and 28 deletions

View File

@ -41,7 +41,6 @@ import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
*/
public class LegacyFrameTranslator implements FrameTranslator {
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map<?, ?> headers = command.getHeaders();
final ActiveMQMessage msg;
@ -59,6 +58,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -83,6 +83,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}

View File

@ -110,6 +110,9 @@ public class ProtocolConverter {
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final StompTransport stompTransport;
private final ConcurrentHashMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
@ -121,6 +124,33 @@ public class ProtocolConverter {
private long hbWriteInterval;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
private static class AckEntry {
private String messageId;
private StompSubscription subscription;
public AckEntry(String messageId, StompSubscription subscription) {
this.messageId = messageId;
this.subscription = subscription;
}
public MessageAck onMessageAck(TransactionId transactionId) {
return subscription.onStompMessageAck(messageId, transactionId);
}
public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
return subscription.onStompMessageNack(messageId, transactionId);
}
public String getMessageId() {
return this.messageId;
}
public StompSubscription getSubscription() {
return this.subscription;
}
}
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
this.brokerContext = brokerContext;
@ -301,15 +331,20 @@ public class ProtocolConverter {
Map<String, String> headers = command.getHeaders();
String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
if (subscriptionId == null) {
if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
throw new ProtocolException("NACK received without a subscription id for acknowledge!");
}
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null) {
if (messageId == null && !this.version.equals(Stomp.V1_2)) {
throw new ProtocolException("NACK received without a message-id to acknowledge!");
}
String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
if (ackId == null && this.version.equals(Stomp.V1_2)) {
throw new ProtocolException("NACK received without an ack header to acknowledge!");
}
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
@ -319,17 +354,32 @@ public class ProtocolConverter {
}
}
if (subscriptionId != null) {
boolean nacked = false;
if (ackId != null) {
AckEntry pendingAck = this.pedingAcks.get(ackId);
if (pendingAck != null) {
messageId = pendingAck.getMessageId();
MessageAck ack = pendingAck.onMessageNack(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
nacked = true;
}
}
} else if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.get(subscriptionId);
if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
} else {
throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
nacked = true;
}
}
}
if (!nacked) {
throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
}
}
protected void onStompAck(StompFrame command) throws ProtocolException {
@ -337,15 +387,20 @@ public class ProtocolConverter {
Map<String, String> headers = command.getHeaders();
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null) {
if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
throw new ProtocolException("ACK received without a message-id to acknowledge!");
}
String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
throw new ProtocolException("ACK received without a subscription id for acknowledge!");
}
String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
if (ackId == null && this.version.equals(Stomp.V1_2)) {
throw new ProtocolException("ACK received without a ack id for acknowledge!");
}
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
@ -357,7 +412,19 @@ public class ProtocolConverter {
boolean acked = false;
if (subscriptionId != null) {
if (ackId != null) {
AckEntry pendingAck = this.pedingAcks.get(ackId);
if (pendingAck != null) {
messageId = pendingAck.getMessageId();
MessageAck ack = pendingAck.onMessageAck(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
}
}
} else if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.get(subscriptionId);
if (sub != null) {
@ -370,7 +437,7 @@ public class ProtocolConverter {
} else {
// TODO: acking with just a message id is very bogus since the same message id
// STOMP v1.0: acking with just a message id is very bogus since the same message id
// could have been sent to 2 different subscriptions on the same Stomp connection.
// For example, when 2 subs are created on the same topic.
@ -505,8 +572,8 @@ public class ProtocolConverter {
}
String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
if( selector!=null ) {
consumerInfo.setSelector("convert_string_expressions:"+selector);
if (selector != null) {
consumerInfo.setSelector("convert_string_expressions:" + selector);
}
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
@ -606,6 +673,7 @@ public class ProtocolConverter {
if (this.version.equals(Stomp.V1_1)) {
clientId = connectionInfo.getClientId();
}
if (durable != null) {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
@ -733,7 +801,6 @@ public class ProtocolConverter {
}
}
});
}
});
}
@ -775,7 +842,19 @@ public class ProtocolConverter {
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
sub.onMessageDispatch(md);
String ackId = null;
if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO) {
AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
ackId = this.ACK_ID_GENERATOR.generateId();
this.pedingAcks.put(ackId, pendingAck);
}
try {
sub.onMessageDispatch(md, ackId);
} catch (Exception ex) {
if (ackId != null) {
this.pedingAcks.remove(ackId);
}
}
}
} else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
stompTransport.sendToStomp(ping);
@ -846,15 +925,11 @@ public class ProtocolConverter {
}
try {
StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
monitor.setReadCheckTime(hbReadInterval);
monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
monitor.setWriteCheckTime(hbWriteInterval);
monitor.startMonitoring();
} catch(Exception ex) {
hbReadInterval = 0;
hbWriteInterval = 0;

View File

@ -32,11 +32,12 @@ public interface Stomp {
String COMMA = ",";
String V1_0 = "1.0";
String V1_1 = "1.1";
String V1_2 = "1.2";
String DEFAULT_HEART_BEAT = "0,0";
String DEFAULT_VERSION = "1.0";
String EMPTY = "";
String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"};
String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.2", "1.1", "1.0"};
String TEXT_PLAIN = "text/plain";
String TRUE = "true";
@ -100,6 +101,7 @@ public interface Stomp {
public interface Message {
String MESSAGE_ID = "message-id";
String ACK_ID = "ack";
String DESTINATION = "destination";
String CORRELATION_ID = "correlation-id";
String EXPIRATION_TIME = "expires";
@ -159,6 +161,7 @@ public interface Stomp {
public interface Ack {
String MESSAGE_ID = "message-id";
String SUBSCRIPTION = "subscription";
String ACK_ID = "id";
}
}

View File

@ -32,10 +32,10 @@ public class StompQueueBrowserSubscription extends StompSubscription {
}
@Override
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
if (md.getMessage() != null) {
super.onMessageDispatch(md);
super.onMessageDispatch(md, ackId);
} else {
StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.transport.stomp;
import org.apache.activemq.command.*;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
@ -26,6 +23,17 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
/**
* Keeps track of the STOMP subscription so that acking is correctly done.
*
@ -55,7 +63,7 @@ public class StompSubscription {
this.transformation = transformation;
}
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if (ackMode == CLIENT_ACK) {
synchronized (this) {
@ -73,7 +81,7 @@ public class StompSubscription {
boolean ignoreTransformation = false;
if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
message.setReadOnlyProperties(false);
message.setReadOnlyProperties(false);
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
} else {
if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
@ -88,6 +96,10 @@ public class StompSubscription {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
if (ackId != null) {
command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
}
protocolConverter.getStompTransport().sendToStomp(command);
}

View File

@ -154,12 +154,23 @@ public class StompWireFormat implements WireFormat {
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
if (baos.size() > maxLength) {
baos.close();
throw new ProtocolException(errorMessage, true);
}
baos.write(b);
}
baos.close();
return baos.toByteSequence();
ByteSequence line = baos.toByteSequence();
if (stompVersion.equals(Stomp.V1_0) || stompVersion.equals(Stomp.V1_2)) {
int lineLength = line.getLength();
if (lineLength > 0 && line.data[lineLength-1] == '\r') {
line.setLength(lineLength-1);
}
}
return line;
}
protected String parseAction(DataInput in) throws IOException {
@ -177,6 +188,7 @@ public class StompWireFormat implements WireFormat {
}
}
}
return action;
}
@ -206,6 +218,7 @@ public class StompWireFormat implements WireFormat {
}
ByteSequence nameSeq = stream.toByteSequence();
String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8");
String value = decodeHeader(headerLine);
if (stompVersion.equals(Stomp.V1_0)) {
@ -213,8 +226,11 @@ public class StompWireFormat implements WireFormat {
}
if (!headers.containsKey(name)) {
headers.put(name, value);
headers.put(name, value);
}
stream.close();
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
public class Stomp12NIOSSLTest extends Stomp12Test {
protected void setUp() throws Exception {
bindAddress = "stomp+nio+ssl://localhost:61613";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
public class Stomp12NIOTest extends Stomp12Test {
@Override
protected void setUp() throws Exception {
bindAddress = "stomp+nio://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
super.setUp();
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
/**
*
*/
public class Stomp12SslAuthTest extends Stomp12Test {
protected void setUp() throws Exception {
// Test mutual authentication on both stomp and standard ssl transports
bindAddress = "stomp+ssl://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
jmsUri="ssl://localhost:61617";
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
//System.setProperty("javax.net.debug","ssl,handshake");
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
}
}

View File

@ -0,0 +1,387 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Stomp12Test extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(Stomp12Test.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private BrokerService broker;
private StompConnection stompConnection = new StompConnection();
private Connection connection;
@Override
protected void setUp() throws Exception {
broker = BrokerFactory.createBroker(new URI(confUri));
broker.start();
broker.waitUntilStarted();
stompConnect();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
connection = cf.createConnection("system", "manager");
connection.start();
}
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
@Override
protected void tearDown() throws Exception {
try {
stompDisconnect();
} catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
private void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
}
@Test
public void testTelnetStyleSends() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "CONNECT\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String send = "SUBSCRIBE\r\n" +
"id:1\r\n" +
"destination:/queue/" + getQueueName() + "\r\n" +
"receipt:1\r\n" +
"\r\n"+
"\u0000\r\n";
stompConnection.sendFrame(send);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
public void testClientAckWithoutAckId() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"ack:client\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
stompConnection.sendFrame(message);
StompFrame received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
String frame = "ACK\n" + "message-id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("ERROR"));
LOG.info("Broker sent: " + received);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
public void testClientAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"ack:client\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
stompConnection.sendFrame(message);
message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL;
stompConnection.sendFrame(message);
StompFrame received = stompConnection.receive();
LOG.info("Stomp Message: {}", received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
received = stompConnection.receive();
LOG.info("Stomp Message: {}", received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("2", received.getBody());
String frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
Thread.sleep(400);
} catch (InterruptedException e){}
// reconnect and send some messages to the offline subscribers and then try to get
// them after subscribing again.
stompConnect();
stompConnection.sendFrame(connect);
frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("CONNECTED"));
stompConnection.sendFrame(subscribe);
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL;
stompConnection.sendFrame(message);
received = stompConnection.receive();
LOG.info("Stomp Message: {}", received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("3", received.getBody());
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
public void testClientIndividualAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"ack:client-individual\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
stompConnection.sendFrame(message);
message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL;
stompConnection.sendFrame(message);
StompFrame received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("2", received.getBody());
String frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
Thread.sleep(400);
} catch (InterruptedException e){}
// reconnect and send some messages to the offline subscribers and then try to get
// them after subscribing again.
stompConnect();
stompConnection.sendFrame(connect);
frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("CONNECTED"));
stompConnection.sendFrame(subscribe);
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL;
stompConnection.sendFrame(message);
received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("3", received.getBody());
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
}