This closes #1774
This commit is contained in:
commit
00bd989f9f
|
@ -157,7 +157,7 @@ public class StompSession implements SessionCallback {
|
||||||
buffer = coreMessage.getReadOnlyBodyBuffer();
|
buffer = coreMessage.getReadOnlyBodyBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
|
if (Boolean.TRUE.equals(serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
|
||||||
ActiveMQBuffer qbuff = buffer;
|
ActiveMQBuffer qbuff = buffer;
|
||||||
int bytesToRead = qbuff.readerIndex();
|
int bytesToRead = qbuff.readerIndex();
|
||||||
Inflater inflater = new Inflater();
|
Inflater inflater = new Inflater();
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.crossprotocol;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AMQPToStompTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private ActiveMQServer server;
|
||||||
|
protected String queueName = "amqpToStompTestQueue1";
|
||||||
|
private SimpleString coreQueue;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server = createServer(true, true);
|
||||||
|
server.start();
|
||||||
|
server.waitForActivation(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
Configuration serverConfig = server.getConfiguration();
|
||||||
|
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false)
|
||||||
|
.setAutoCreateAddresses(false)
|
||||||
|
.setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
|
||||||
|
serverConfig.setSecurityEnabled(false);
|
||||||
|
coreQueue = new SimpleString(queueName);
|
||||||
|
server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
server.stop();
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendAmqpReceiveStomp() throws Exception {
|
||||||
|
AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:61616"), null, null);
|
||||||
|
AmqpConnection amqpconnection = client.connect();
|
||||||
|
try {
|
||||||
|
AmqpSession session = amqpconnection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(queueName);
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("mine");
|
||||||
|
sender.send(message);
|
||||||
|
} finally {
|
||||||
|
amqpconnection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(new URI("tcp://127.0.0.1:61616"));
|
||||||
|
conn.connect(null, null);
|
||||||
|
try {
|
||||||
|
StompTestBase.subscribeQueue(conn, null, queueName);
|
||||||
|
ClientStompFrame frame = conn.receiveFrame();
|
||||||
|
assertNotNull(frame);
|
||||||
|
assertNotNull(frame.getBody());
|
||||||
|
assertTrue(frame.getBody().contains("mine"));
|
||||||
|
} finally {
|
||||||
|
conn.closeTransport();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -219,19 +219,19 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getQueueName() {
|
protected static String getQueueName() {
|
||||||
return "testQueue";
|
return "testQueue";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getQueuePrefix() {
|
protected static String getQueuePrefix() {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getTopicName() {
|
protected static String getTopicName() {
|
||||||
return "testtopic";
|
return "testtopic";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getTopicPrefix() {
|
protected static String getTopicPrefix() {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,25 +264,25 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
|
public static void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
|
||||||
ClientStompFrame abortFrame = conn.createFrame(Stomp.Commands.ABORT)
|
ClientStompFrame abortFrame = conn.createFrame(Stomp.Commands.ABORT)
|
||||||
.addHeader(Stomp.Headers.TRANSACTION, txID);
|
.addHeader(Stomp.Headers.TRANSACTION, txID);
|
||||||
|
|
||||||
conn.sendFrame(abortFrame);
|
conn.sendFrame(abortFrame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
|
public static void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
|
||||||
ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.BEGIN)
|
ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.BEGIN)
|
||||||
.addHeader(Stomp.Headers.TRANSACTION, txID);
|
.addHeader(Stomp.Headers.TRANSACTION, txID);
|
||||||
|
|
||||||
conn.sendFrame(beginFrame);
|
conn.sendFrame(beginFrame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
|
public static void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
|
||||||
commitTransaction(conn, txID, false);
|
commitTransaction(conn, txID, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void commitTransaction(StompClientConnection conn,
|
public static void commitTransaction(StompClientConnection conn,
|
||||||
String txID,
|
String txID,
|
||||||
boolean receipt) throws IOException, InterruptedException {
|
boolean receipt) throws IOException, InterruptedException {
|
||||||
ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.COMMIT)
|
ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.COMMIT)
|
||||||
|
@ -297,7 +297,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void ack(StompClientConnection conn,
|
public static void ack(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
ClientStompFrame messageIdFrame) throws IOException, InterruptedException {
|
ClientStompFrame messageIdFrame) throws IOException, InterruptedException {
|
||||||
String messageID = messageIdFrame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
|
String messageID = messageIdFrame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
|
||||||
|
@ -315,7 +315,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void ack(StompClientConnection conn,
|
public static void ack(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String mid,
|
String mid,
|
||||||
String txID) throws IOException, InterruptedException {
|
String txID) throws IOException, InterruptedException {
|
||||||
|
@ -329,7 +329,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void nack(StompClientConnection conn, String subscriptionId, String messageId) throws IOException, InterruptedException {
|
public static void nack(StompClientConnection conn, String subscriptionId, String messageId) throws IOException, InterruptedException {
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.NACK)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.NACK)
|
||||||
.addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId)
|
.addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId)
|
||||||
.addHeader(Stomp.Headers.Message.MESSAGE_ID, messageId);
|
.addHeader(Stomp.Headers.Message.MESSAGE_ID, messageId);
|
||||||
|
@ -337,25 +337,25 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId) throws IOException, InterruptedException {
|
String subscriptionId) throws IOException, InterruptedException {
|
||||||
return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack) throws IOException, InterruptedException {
|
String ack) throws IOException, InterruptedException {
|
||||||
return subscribe(conn, subscriptionId, ack, null);
|
return subscribe(conn, subscriptionId, ack, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId) throws IOException, InterruptedException {
|
String durableId) throws IOException, InterruptedException {
|
||||||
return subscribe(conn, subscriptionId, ack, durableId, true);
|
return subscribe(conn, subscriptionId, ack, durableId, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId,
|
String durableId,
|
||||||
|
@ -363,7 +363,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return subscribe(conn, subscriptionId, ack, durableId, null, receipt);
|
return subscribe(conn, subscriptionId, ack, durableId, null, receipt);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId,
|
String durableId,
|
||||||
|
@ -371,7 +371,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return subscribe(conn, subscriptionId, ack, durableId, selector, true);
|
return subscribe(conn, subscriptionId, ack, durableId, selector, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId,
|
String durableId,
|
||||||
|
@ -380,11 +380,11 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt);
|
return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribeQueue(StompClientConnection conn, String subId, String destination) throws IOException, InterruptedException {
|
public static ClientStompFrame subscribeQueue(StompClientConnection conn, String subId, String destination) throws IOException, InterruptedException {
|
||||||
return subscribe(conn, subId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, destination, true);
|
return subscribe(conn, subId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, destination, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribe(StompClientConnection conn,
|
public static ClientStompFrame subscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId,
|
String durableId,
|
||||||
|
@ -426,14 +426,14 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return frame;
|
return frame;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribeTopic(StompClientConnection conn,
|
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId) throws IOException, InterruptedException {
|
String durableId) throws IOException, InterruptedException {
|
||||||
return subscribeTopic(conn, subscriptionId, ack, durableId, true);
|
return subscribeTopic(conn, subscriptionId, ack, durableId, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribeTopic(StompClientConnection conn,
|
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId,
|
String durableId,
|
||||||
|
@ -441,7 +441,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false);
|
return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame subscribeTopic(StompClientConnection conn,
|
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String ack,
|
String ack,
|
||||||
String durableId,
|
String durableId,
|
||||||
|
@ -481,17 +481,17 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return frame;
|
return frame;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame unsubscribe(StompClientConnection conn, String subscriptionId) throws IOException, InterruptedException {
|
public static ClientStompFrame unsubscribe(StompClientConnection conn, String subscriptionId) throws IOException, InterruptedException {
|
||||||
return unsubscribe(conn, subscriptionId, null, false, false);
|
return unsubscribe(conn, subscriptionId, null, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame unsubscribe(StompClientConnection conn,
|
public static ClientStompFrame unsubscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
boolean receipt) throws IOException, InterruptedException {
|
boolean receipt) throws IOException, InterruptedException {
|
||||||
return unsubscribe(conn, subscriptionId, null, receipt, false);
|
return unsubscribe(conn, subscriptionId, null, receipt, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame unsubscribe(StompClientConnection conn,
|
public static ClientStompFrame unsubscribe(StompClientConnection conn,
|
||||||
String subscriptionId,
|
String subscriptionId,
|
||||||
String destination,
|
String destination,
|
||||||
boolean receipt,
|
boolean receipt,
|
||||||
|
@ -523,19 +523,19 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return frame;
|
return frame;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body) throws IOException, InterruptedException {
|
public static ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body) throws IOException, InterruptedException {
|
||||||
return send(conn, destination, contentType, body, false);
|
return send(conn, destination, contentType, body, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt) throws IOException, InterruptedException {
|
public static ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt) throws IOException, InterruptedException {
|
||||||
return send(conn, destination, contentType, body, receipt, null);
|
return send(conn, destination, contentType, body, receipt, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, RoutingType destinationType) throws IOException, InterruptedException {
|
public static ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, RoutingType destinationType) throws IOException, InterruptedException {
|
||||||
return send(conn, destination, contentType, body, receipt, destinationType, null);
|
return send(conn, destination, contentType, body, receipt, destinationType, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, RoutingType destinationType, String txId) throws IOException, InterruptedException {
|
public static ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, RoutingType destinationType, String txId) throws IOException, InterruptedException {
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Send.DESTINATION, destination)
|
.addHeader(Stomp.Headers.Send.DESTINATION, destination)
|
||||||
.setBody(body);
|
.setBody(body);
|
||||||
|
@ -573,7 +573,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
return frame;
|
return frame;
|
||||||
}
|
}
|
||||||
|
|
||||||
public URI createStompClientUri(String scheme, String hostname, int port) throws URISyntaxException {
|
public static URI createStompClientUri(String scheme, String hostname, int port) throws URISyntaxException {
|
||||||
return new URI(scheme + "://" + hostname + ":" + port);
|
return new URI(scheme + "://" + hostname + ":" + port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1153,7 +1153,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
//ack the last
|
//ack the last
|
||||||
this.ack(conn, "sub1", frame);
|
ack(conn, "sub1", frame);
|
||||||
|
|
||||||
unsubscribe(conn, "sub1");
|
unsubscribe(conn, "sub1");
|
||||||
|
|
||||||
|
@ -1186,7 +1186,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
//ack the 49th
|
//ack the 49th
|
||||||
if (i == num - 2) {
|
if (i == num - 2) {
|
||||||
this.ack(conn, "sub1", frame);
|
ack(conn, "sub1", frame);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1252,7 +1252,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
IntegrationTestLogger.LOGGER.info(i + " == received: " + frame);
|
IntegrationTestLogger.LOGGER.info(i + " == received: " + frame);
|
||||||
//ack on even numbers
|
//ack on even numbers
|
||||||
if (i % 2 == 0) {
|
if (i % 2 == 0) {
|
||||||
this.ack(conn, "sub1", frame);
|
ack(conn, "sub1", frame);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1279,12 +1279,12 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testTwoSubscribers() throws Exception {
|
public void testTwoSubscribers() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
||||||
|
|
||||||
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
|
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
newConn.connect(defUser, defPass, "myclientid2");
|
newConn.connect(defUser, defPass, "myclientid2");
|
||||||
|
|
||||||
this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
||||||
|
|
||||||
send(newConn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
send(newConn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||||
|
|
||||||
|
@ -1302,8 +1302,8 @@ public class StompV11Test extends StompTestBase {
|
||||||
assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
|
assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
|
||||||
|
|
||||||
// remove suscription
|
// remove suscription
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
this.unsubscribe(newConn, "sub2", true);
|
unsubscribe(newConn, "sub2", true);
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
newConn.disconnect();
|
newConn.disconnect();
|
||||||
|
@ -1318,7 +1318,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection(uri);
|
StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
connV11_2.connect(defUser, defPass);
|
connV11_2.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
ClientStompFrame frame = connV11_2.receiveFrame(2000);
|
ClientStompFrame frame = connV11_2.receiveFrame(2000);
|
||||||
|
|
||||||
|
@ -1347,7 +1347,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testBodyWithUTF8() throws Exception {
|
public void testBodyWithUTF8() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
|
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
|
||||||
IntegrationTestLogger.LOGGER.info(text);
|
IntegrationTestLogger.LOGGER.info(text);
|
||||||
|
@ -1366,7 +1366,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testClientAckNotPartOfTransaction() throws Exception {
|
public void testClientAckNotPartOfTransaction() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1381,7 +1381,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
this.ack(conn, getName(), messageID, "tx1");
|
ack(conn, getName(), messageID, "tx1");
|
||||||
|
|
||||||
abortTransaction(conn, "tx1");
|
abortTransaction(conn, "tx1");
|
||||||
|
|
||||||
|
@ -1389,7 +1389,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
assertNull(frame);
|
assertNull(frame);
|
||||||
|
|
||||||
this.unsubscribe(conn, getName());
|
unsubscribe(conn, getName());
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -1398,7 +1398,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testDisconnectAndError() throws Exception {
|
public void testDisconnectAndError() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
String uuid = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
@ -1453,9 +1453,9 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testDurableSubscriber() throws Exception {
|
public void testDurableSubscriber() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName(), false);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName(), false);
|
||||||
ClientStompFrame frame = conn.receiveFrame();
|
ClientStompFrame frame = conn.receiveFrame();
|
||||||
|
|
||||||
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
|
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
|
||||||
|
@ -1467,7 +1467,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testDurableSubscriberWithReconnection() throws Exception {
|
public void testDurableSubscriberWithReconnection() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
String uuid = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
@ -1487,7 +1487,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
conn = StompClientConnectionFactory.createClientConnection(uri);
|
conn = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
||||||
|
|
||||||
// we must have received the message
|
// we must have received the message
|
||||||
frame = conn.receiveFrame();
|
frame = conn.receiveFrame();
|
||||||
|
@ -1496,7 +1496,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
|
Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
|
||||||
Assert.assertEquals(getName(), frame.getBody());
|
Assert.assertEquals(getName(), frame.getBody());
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1");
|
unsubscribe(conn, "sub1");
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -1505,14 +1505,14 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testDurableUnSubscribe() throws Exception {
|
public void testDurableUnSubscribe() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
conn.destroy();
|
conn.destroy();
|
||||||
conn = StompClientConnectionFactory.createClientConnection(uri);
|
conn = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.unsubscribe(conn, getName(), null, false, true);
|
unsubscribe(conn, getName(), null, false, true);
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
||||||
|
@ -1552,7 +1552,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
for (int i = 0; i < ctr; ++i) {
|
for (int i = 0; i < ctr; ++i) {
|
||||||
data[i] = getName() + i;
|
data[i] = getName() + i;
|
||||||
|
@ -1583,7 +1583,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
|
||||||
|
|
||||||
sendJmsMessage("Ignored message", "foo", "1234");
|
sendJmsMessage("Ignored message", "foo", "1234");
|
||||||
sendJmsMessage("Real message", "foo", "zzz");
|
sendJmsMessage("Real message", "foo", "zzz");
|
||||||
|
@ -1599,7 +1599,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testRedeliveryWithClientAck() throws Exception {
|
public void testRedeliveryWithClientAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "subscriptionId", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
subscribe(conn, "subscriptionId", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1834,7 +1834,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeToTopic() throws Exception {
|
public void testSubscribeToTopic() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", null, null, true);
|
subscribeTopic(conn, "sub1", null, null, true);
|
||||||
|
|
||||||
sendJmsMessage(getName(), topic);
|
sendJmsMessage(getName(), topic);
|
||||||
|
|
||||||
|
@ -1844,7 +1844,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
Assert.assertTrue(frame.getHeader(Stomp.Headers.Message.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
Assert.assertTrue(frame.getHeader(Stomp.Headers.Message.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
||||||
Assert.assertTrue(frame.getBody().equals(getName()));
|
Assert.assertTrue(frame.getBody().equals(getName()));
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
|
|
||||||
sendJmsMessage(getName(), topic);
|
sendJmsMessage(getName(), topic);
|
||||||
|
|
||||||
|
@ -1858,7 +1858,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeToTopicWithNoLocal() throws Exception {
|
public void testSubscribeToTopicWithNoLocal() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", null, null, true, true);
|
subscribeTopic(conn, "sub1", null, null, true, true);
|
||||||
|
|
||||||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||||
|
|
||||||
|
@ -1875,7 +1875,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
Assert.assertTrue(frame.getHeader(Stomp.Headers.Message.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
Assert.assertTrue(frame.getHeader(Stomp.Headers.Message.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
||||||
Assert.assertTrue(frame.getBody().equals(getName()));
|
Assert.assertTrue(frame.getBody().equals(getName()));
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1");
|
unsubscribe(conn, "sub1");
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -1884,7 +1884,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeWithAutoAck() throws Exception {
|
public void testSubscribeWithAutoAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1906,7 +1906,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
byte[] payload = new byte[]{1, 2, 3, 4, 5};
|
byte[] payload = new byte[]{1, 2, 3, 4, 5};
|
||||||
sendJmsMessage(payload, queue);
|
sendJmsMessage(payload, queue);
|
||||||
|
@ -1930,7 +1930,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeWithClientAck() throws Exception {
|
public void testSubscribeWithClientAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1938,7 +1938,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
assertEquals(getName().length(), Integer.parseInt(frame.getHeader(Stomp.Headers.CONTENT_LENGTH)));
|
assertEquals(getName().length(), Integer.parseInt(frame.getHeader(Stomp.Headers.CONTENT_LENGTH)));
|
||||||
|
|
||||||
this.ack(conn, "sub1", frame);
|
ack(conn, "sub1", frame);
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
|
|
||||||
|
@ -1962,7 +1962,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeWithID() throws Exception {
|
public void testSubscribeWithID() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1977,7 +1977,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(queue);
|
MessageProducer producer = session.createProducer(queue);
|
||||||
BytesMessage message = session.createBytesMessage();
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
@ -2015,7 +2015,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
// first tx
|
// first tx
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2024,13 +2024,13 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1");
|
commitTransaction(conn, "tx1");
|
||||||
|
|
||||||
Message message = consumer.receive(1000);
|
Message message = consumer.receive(1000);
|
||||||
Assert.assertNotNull("Should have received a message", message);
|
Assert.assertNotNull("Should have received a message", message);
|
||||||
|
|
||||||
// 2nd tx with same tx ID
|
// 2nd tx with same tx ID
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
frame = conn.createFrame(Stomp.Commands.SEND)
|
frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2039,7 +2039,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1");
|
commitTransaction(conn, "tx1");
|
||||||
|
|
||||||
message = consumer.receive(1000);
|
message = consumer.receive(1000);
|
||||||
Assert.assertNotNull("Should have received a message", message);
|
Assert.assertNotNull("Should have received a message", message);
|
||||||
|
@ -2053,7 +2053,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2068,7 +2068,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
// check the message is not committed
|
// check the message is not committed
|
||||||
assertNull(consumer.receive(100));
|
assertNull(consumer.receive(100));
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1", true);
|
commitTransaction(conn, "tx1", true);
|
||||||
|
|
||||||
Message message = consumer.receive(1000);
|
Message message = consumer.receive(1000);
|
||||||
Assert.assertNotNull("Should have received a message", message);
|
Assert.assertNotNull("Should have received a message", message);
|
||||||
|
@ -2082,7 +2082,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2092,9 +2092,9 @@ public class StompV11Test extends StompTestBase {
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
// rollback first message
|
// rollback first message
|
||||||
this.abortTransaction(conn, "tx1");
|
abortTransaction(conn, "tx1");
|
||||||
|
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
frame = conn.createFrame(Stomp.Commands.SEND)
|
frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2103,7 +2103,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1", true);
|
commitTransaction(conn, "tx1", true);
|
||||||
|
|
||||||
// only second msg should be received since first msg was rolled back
|
// only second msg should be received since first msg was rolled back
|
||||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||||
|
@ -2117,7 +2117,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testUnsubscribe() throws Exception {
|
public void testUnsubscribe() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
// send a message to our queue
|
// send a message to our queue
|
||||||
sendJmsMessage("first message");
|
sendJmsMessage("first message");
|
||||||
|
@ -2128,7 +2128,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
||||||
|
|
||||||
// remove suscription
|
// remove suscription
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
|
|
||||||
// send a message to our queue
|
// send a message to our queue
|
||||||
sendJmsMessage("second message");
|
sendJmsMessage("second message");
|
||||||
|
@ -2185,7 +2185,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -2206,7 +2206,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
// message should be received since message was not acknowledged
|
// message should be received since message was not acknowledged
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", null);
|
subscribe(conn, "sub1", null);
|
||||||
|
|
||||||
frame = conn.receiveFrame();
|
frame = conn.receiveFrame();
|
||||||
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
||||||
|
@ -2218,7 +2218,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
conn = StompClientConnectionFactory.createClientConnection(uri);
|
conn = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", null, null, true);
|
subscribe(conn, "sub1", null, null, true);
|
||||||
|
|
||||||
sendJmsMessage("shouldBeNextMessage");
|
sendJmsMessage("shouldBeNextMessage");
|
||||||
|
|
||||||
|
@ -2273,7 +2273,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
public void testSendContentType() throws Exception {
|
public void testSendContentType() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(queue);
|
MessageProducer producer = session.createProducer(queue);
|
||||||
BytesMessage message = session.createBytesMessage();
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
|
|
@ -1268,12 +1268,12 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testTwoSubscribers() throws Exception {
|
public void testTwoSubscribers() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
||||||
|
|
||||||
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(v11Uri);
|
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(v11Uri);
|
||||||
newConn.connect(defUser, defPass, "myclientid2");
|
newConn.connect(defUser, defPass, "myclientid2");
|
||||||
|
|
||||||
this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
|
||||||
|
|
||||||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||||
|
|
||||||
|
@ -1291,8 +1291,8 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
|
Assert.assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
|
||||||
|
|
||||||
// remove suscription
|
// remove suscription
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
this.unsubscribe(newConn, "sub2", true);
|
unsubscribe(newConn, "sub2", true);
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
newConn.disconnect();
|
newConn.disconnect();
|
||||||
|
@ -1307,7 +1307,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
StompClientConnection connV12_2 = StompClientConnectionFactory.createClientConnection(v11Uri);
|
StompClientConnection connV12_2 = StompClientConnectionFactory.createClientConnection(v11Uri);
|
||||||
connV12_2.connect(defUser, defPass);
|
connV12_2.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(connV12_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(connV12_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
ClientStompFrame frame = connV12_2.receiveFrame(2000);
|
ClientStompFrame frame = connV12_2.receiveFrame(2000);
|
||||||
|
|
||||||
|
@ -1336,7 +1336,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testBodyWithUTF8() throws Exception {
|
public void testBodyWithUTF8() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
|
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
|
||||||
System.out.println(text);
|
System.out.println(text);
|
||||||
|
@ -1355,7 +1355,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testClientAckNotPartOfTransaction() throws Exception {
|
public void testClientAckNotPartOfTransaction() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, getName(), "client");
|
subscribe(conn, getName(), "client");
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1379,7 +1379,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
Assert.assertNull(frame);
|
Assert.assertNull(frame);
|
||||||
|
|
||||||
this.unsubscribe(conn, getName());
|
unsubscribe(conn, getName());
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -1388,7 +1388,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testDisconnectAndError() throws Exception {
|
public void testDisconnectAndError() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, getName(), "client");
|
subscribe(conn, getName(), "client");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame("DISCONNECT");
|
ClientStompFrame frame = conn.createFrame("DISCONNECT");
|
||||||
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1");
|
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1");
|
||||||
|
@ -1441,9 +1441,9 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testDurableSubscriber() throws Exception {
|
public void testDurableSubscriber() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", "client", getName());
|
subscribe(conn, "sub1", "client", getName());
|
||||||
|
|
||||||
ClientStompFrame frame = this.subscribe(conn, "sub1", "client", getName());
|
ClientStompFrame frame = subscribe(conn, "sub1", "client", getName());
|
||||||
|
|
||||||
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
|
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
|
||||||
|
|
||||||
|
@ -1455,7 +1455,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testDurableSubscriberWithReconnection() throws Exception {
|
public void testDurableSubscriberWithReconnection() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame("DISCONNECT");
|
ClientStompFrame frame = conn.createFrame("DISCONNECT");
|
||||||
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1");
|
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1");
|
||||||
|
@ -1473,7 +1473,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
||||||
|
|
||||||
// we must have received the message
|
// we must have received the message
|
||||||
frame = conn.receiveFrame();
|
frame = conn.receiveFrame();
|
||||||
|
@ -1482,7 +1482,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION));
|
Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION));
|
||||||
Assert.assertEquals(getName(), frame.getBody());
|
Assert.assertEquals(getName(), frame.getBody());
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1");
|
unsubscribe(conn, "sub1");
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -1491,14 +1491,14 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testDurableUnSubscribe() throws Exception {
|
public void testDurableUnSubscribe() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
conn.destroy();
|
conn.destroy();
|
||||||
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.unsubscribe(conn, getName(), null, false, true);
|
unsubscribe(conn, getName(), null, false, true);
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
||||||
|
@ -1538,7 +1538,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
for (int i = 0; i < ctr; ++i) {
|
for (int i = 0; i < ctr; ++i) {
|
||||||
data[i] = getName() + i;
|
data[i] = getName() + i;
|
||||||
|
@ -1569,7 +1569,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
|
||||||
|
|
||||||
sendJmsMessage("Ignored message", "foo", "1234");
|
sendJmsMessage("Ignored message", "foo", "1234");
|
||||||
sendJmsMessage("Real message", "foo", "zzz");
|
sendJmsMessage("Real message", "foo", "zzz");
|
||||||
|
@ -1585,7 +1585,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testRedeliveryWithClientAck() throws Exception {
|
public void testRedeliveryWithClientAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "subId", "client");
|
subscribe(conn, "subId", "client");
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1825,7 +1825,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeToTopic() throws Exception {
|
public void testSubscribeToTopic() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", null, null, true);
|
subscribeTopic(conn, "sub1", null, null, true);
|
||||||
|
|
||||||
sendJmsMessage(getName(), topic);
|
sendJmsMessage(getName(), topic);
|
||||||
|
|
||||||
|
@ -1835,7 +1835,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertTrue(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
Assert.assertTrue(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
||||||
Assert.assertTrue(frame.getBody().equals(getName()));
|
Assert.assertTrue(frame.getBody().equals(getName()));
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
|
|
||||||
sendJmsMessage(getName(), topic);
|
sendJmsMessage(getName(), topic);
|
||||||
|
|
||||||
|
@ -1849,7 +1849,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeToTopicWithNoLocal() throws Exception {
|
public void testSubscribeToTopicWithNoLocal() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribeTopic(conn, "sub1", null, null, true, true);
|
subscribeTopic(conn, "sub1", null, null, true, true);
|
||||||
|
|
||||||
// send a message on the same connection => it should not be received
|
// send a message on the same connection => it should not be received
|
||||||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||||
|
@ -1867,7 +1867,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertTrue(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
Assert.assertTrue(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION).equals(getTopicPrefix() + getTopicName()));
|
||||||
Assert.assertTrue(frame.getBody().equals(getName()));
|
Assert.assertTrue(frame.getBody().equals(getName()));
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1");
|
unsubscribe(conn, "sub1");
|
||||||
|
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -1876,7 +1876,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeWithAutoAck() throws Exception {
|
public void testSubscribeWithAutoAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1898,7 +1898,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
byte[] payload = new byte[]{1, 2, 3, 4, 5};
|
byte[] payload = new byte[]{1, 2, 3, 4, 5};
|
||||||
sendJmsMessage(payload, queue);
|
sendJmsMessage(payload, queue);
|
||||||
|
@ -1922,7 +1922,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeWithClientAck() throws Exception {
|
public void testSubscribeWithClientAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", "client");
|
subscribe(conn, "sub1", "client");
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1952,7 +1952,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeWithID() throws Exception {
|
public void testSubscribeWithID() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -1967,7 +1967,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(queue);
|
MessageProducer producer = session.createProducer(queue);
|
||||||
BytesMessage message = session.createBytesMessage();
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
@ -2005,7 +2005,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
// first tx
|
// first tx
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2014,13 +2014,13 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1");
|
commitTransaction(conn, "tx1");
|
||||||
|
|
||||||
Message message = consumer.receive(1000);
|
Message message = consumer.receive(1000);
|
||||||
Assert.assertNotNull("Should have received a message", message);
|
Assert.assertNotNull("Should have received a message", message);
|
||||||
|
|
||||||
// 2nd tx with same tx ID
|
// 2nd tx with same tx ID
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
frame = conn.createFrame(Stomp.Commands.SEND)
|
frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2029,7 +2029,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1");
|
commitTransaction(conn, "tx1");
|
||||||
|
|
||||||
message = consumer.receive(1000);
|
message = consumer.receive(1000);
|
||||||
Assert.assertNotNull("Should have received a message", message);
|
Assert.assertNotNull("Should have received a message", message);
|
||||||
|
@ -2043,7 +2043,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2058,7 +2058,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
// check the message is not committed
|
// check the message is not committed
|
||||||
Assert.assertNull(consumer.receive(100));
|
Assert.assertNull(consumer.receive(100));
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1", true);
|
commitTransaction(conn, "tx1", true);
|
||||||
|
|
||||||
Message message = consumer.receive(1000);
|
Message message = consumer.receive(1000);
|
||||||
Assert.assertNotNull("Should have received a message", message);
|
Assert.assertNotNull("Should have received a message", message);
|
||||||
|
@ -2072,7 +2072,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2082,9 +2082,9 @@ public class StompV12Test extends StompTestBase {
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
// rollback first message
|
// rollback first message
|
||||||
this.abortTransaction(conn, "tx1");
|
abortTransaction(conn, "tx1");
|
||||||
|
|
||||||
this.beginTransaction(conn, "tx1");
|
beginTransaction(conn, "tx1");
|
||||||
|
|
||||||
frame = conn.createFrame(Stomp.Commands.SEND)
|
frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
@ -2093,7 +2093,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
conn.sendFrame(frame);
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
this.commitTransaction(conn, "tx1", true);
|
commitTransaction(conn, "tx1", true);
|
||||||
|
|
||||||
// only second msg should be received since first msg was rolled back
|
// only second msg should be received since first msg was rolled back
|
||||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||||
|
@ -2107,7 +2107,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testUnsubscribe() throws Exception {
|
public void testUnsubscribe() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
// send a message to our queue
|
// send a message to our queue
|
||||||
sendJmsMessage("first message");
|
sendJmsMessage("first message");
|
||||||
|
@ -2118,7 +2118,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
||||||
|
|
||||||
// remove suscription
|
// remove suscription
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
|
|
||||||
// send a message to our queue
|
// send a message to our queue
|
||||||
sendJmsMessage("second message");
|
sendJmsMessage("second message");
|
||||||
|
@ -2133,7 +2133,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
public void testDisconnectWithoutUnsubscribe() throws Exception {
|
public void testDisconnectWithoutUnsubscribe() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
// send a message to our queue
|
// send a message to our queue
|
||||||
sendJmsMessage("first message");
|
sendJmsMessage("first message");
|
||||||
|
@ -2157,7 +2157,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertNull("not expected: " + frame, frame);
|
Assert.assertNull("not expected: " + frame, frame);
|
||||||
|
|
||||||
//subscribe again.
|
//subscribe again.
|
||||||
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
|
||||||
// receive message from socket
|
// receive message from socket
|
||||||
frame = conn.receiveFrame();
|
frame = conn.receiveFrame();
|
||||||
|
@ -2168,7 +2168,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
frame = conn.receiveFrame(1000);
|
frame = conn.receiveFrame(1000);
|
||||||
Assert.assertNull("not expected: " + frame, frame);
|
Assert.assertNull("not expected: " + frame, frame);
|
||||||
|
|
||||||
this.unsubscribe(conn, "sub1", true);
|
unsubscribe(conn, "sub1", true);
|
||||||
|
|
||||||
frame = conn.receiveFrame(1000);
|
frame = conn.receiveFrame(1000);
|
||||||
Assert.assertNull(frame);
|
Assert.assertNull(frame);
|
||||||
|
@ -2179,7 +2179,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", "client");
|
subscribe(conn, "sub1", "client");
|
||||||
|
|
||||||
sendJmsMessage(getName());
|
sendJmsMessage(getName());
|
||||||
|
|
||||||
|
@ -2200,7 +2200,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
// message should be received since message was not acknowledged
|
// message should be received since message was not acknowledged
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", null);
|
subscribe(conn, "sub1", null);
|
||||||
|
|
||||||
frame = conn.receiveFrame();
|
frame = conn.receiveFrame();
|
||||||
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
|
||||||
|
@ -2212,7 +2212,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
this.subscribe(conn, "sub1", null, null, true);
|
subscribe(conn, "sub1", null, null, true);
|
||||||
|
|
||||||
sendJmsMessage("shouldBeNextMessage");
|
sendJmsMessage("shouldBeNextMessage");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue