ARTEMIS-1511 Update tests to use StompTest Client + Fix issues

This commit is contained in:
Martyn Taylor 2017-11-10 12:34:40 +00:00 committed by Clebert Suconic
parent c6e5163a51
commit a5c443afb0
12 changed files with 548 additions and 393 deletions

View File

@ -40,6 +40,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -99,7 +100,10 @@ public class StompPluginTest extends StompTestBase {
public void testSendAndReceive() throws Exception { public void testSendAndReceive() throws Exception {
// subscribe // subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); //StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
try {
URI uri = new URI("ws+v12.stomp://localhost:61613");
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -125,6 +129,10 @@ public class StompPluginTest extends StompTestBase {
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER); AFTER_DELIVER);
} catch (Throwable e) {
e.printStackTrace();
}
} }
} }

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.tests.integration.stomp; package org.apache.activemq.artemis.tests.integration.stomp;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@ -24,16 +27,24 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class FQQNStompTest extends StompTestBase { public class FQQNStompTest extends StompTestBase {
private StompClientConnection conn; private StompClientConnection conn;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v12.stomp"}, {"tcp+v12.stomp"}});
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
QueueQueryResult result = server.getActiveMQServer().queueQuery(new SimpleString(getQueueName())); QueueQueryResult result = server.getActiveMQServer().queueQuery(new SimpleString(getQueueName()));
assertTrue(result.isExists()); assertTrue(result.isExists());
System.out.println("address: " + result.getAddress() + " queue " + result.getName()); System.out.println("address: " + result.getAddress() + " queue " + result.getName());
@ -51,6 +62,7 @@ public class FQQNStompTest extends StompTestBase {
} }
} }
} finally { } finally {
conn.closeTransport();
super.tearDown(); super.tearDown();
} }
} }
@ -83,21 +95,20 @@ public class FQQNStompTest extends StompTestBase {
unsubscribe(conn, "sub-01"); unsubscribe(conn, "sub-01");
//queue:: //queue::
subscribeQueue(conn, "sub-01", getQueueName() + "\\c\\c"); frame = subscribeQueue(conn, "sub-01", getQueueName() + "\\c\\c");
sendJmsMessage("Hello World!");
frame = conn.receiveFrame(2000);
assertNotNull(frame); assertNotNull(frame);
assertEquals("ERROR", frame.getCommand()); assertEquals("ERROR", frame.getCommand());
assertTrue(frame.getBody().contains(getQueueName())); assertTrue(frame.getBody().contains(getQueueName()));
assertTrue(frame.getBody().contains("not exist")); assertTrue(frame.getBody().contains("not exist"));
conn.closeTransport();
//need reconnect because stomp disconnect on error //need reconnect because stomp disconnect on error
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass);
//:: will subscribe to no queue so no message received. //:: will subscribe to no queue so no message received.
subscribeQueue(conn, "sub-01", "\\c\\c"); frame = subscribeQueue(conn, "sub-01", "\\c\\c");
sendJmsMessage("Hello World!"); assertTrue(frame.getBody().contains("Queue :: does not exist"));
frame = conn.receiveFrame(2000);
assertNull(frame);
} }
} }

View File

@ -23,6 +23,7 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -66,7 +67,10 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class StompTest extends StompTestBase { public class StompTest extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@ -76,7 +80,7 @@ public class StompTest extends StompTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
} }
@Override @Override
@ -94,6 +98,7 @@ public class StompTest extends StompTestBase {
} }
} finally { } finally {
super.tearDown(); super.tearDown();
conn.closeTransport();
} }
} }
@ -101,8 +106,10 @@ public class StompTest extends StompTestBase {
public void testConnectionTTL() throws Exception { public void testConnectionTTL() throws Exception {
int port = 61614; int port = 61614;
URI uri = createStompClientUri(scheme, hostname, port);
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start(); server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect("brianm", "wombats"); conn.connect("brianm", "wombats");
Thread.sleep(5000); Thread.sleep(5000);
@ -257,33 +264,6 @@ public class StompTest extends StompTestBase {
clientProvider.disconnect(); clientProvider.disconnect();
} }
@Test
public void testSendReceiveLargeMessage() throws Exception {
String address = "testLargeMessageAddress";
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);
// STOMP default is UTF-8 == 1 byte per char.
int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
StringBuilder b = new StringBuilder(largeMessageStringSize);
for (int i = 0; i < largeMessageStringSize; i++) {
b.append('t');
}
String payload = b.toString();
// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);
// Send Large Message
System.out.println("Sending Message Size: " + largeMessageStringSize);
send(conn, address, null, payload);
// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
System.out.println(frame.getBody().length());
assertTrue(frame.getBody().equals(payload));
}
@Test @Test
public void sendMQTTReceiveSTOMP() throws Exception { public void sendMQTTReceiveSTOMP() throws Exception {
String payload = "This is a test message"; String payload = "This is a test message";
@ -936,10 +916,10 @@ public class StompTest extends StompTestBase {
if (sendDisconnect) { if (sendDisconnect) {
conn.disconnect(); conn.disconnect();
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
} else { } else {
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
} }
// message should be received since message was not acknowledged // message should be received since message was not acknowledged
@ -953,7 +933,7 @@ public class StompTest extends StompTestBase {
conn.disconnect(); conn.disconnect();
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
// now let's make sure we don't see the message again // now let's make sure we don't see the message again
@ -1219,7 +1199,7 @@ public class StompTest extends StompTestBase {
sendJmsMessage(getName(), topic); sendJmsMessage(getName(), topic);
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, "myclientid"); conn.connect(defUser, defPass, "myclientid");
subscribeTopic(conn, null, null, getName()); subscribeTopic(conn, null, null, getName());
@ -1257,7 +1237,7 @@ public class StompTest extends StompTestBase {
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, "myclientid"); conn.connect(defUser, defPass, "myclientid");
unsubscribe(conn, getName(), getTopicPrefix() + getTopicName(), false, true); unsubscribe(conn, getName(), getTopicPrefix() + getTopicName(), false, true);
@ -1302,7 +1282,7 @@ public class StompTest extends StompTestBase {
conn.destroy(); conn.destroy();
// connect again // connect again
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
// send a receipted message to the topic // send a receipted message to the topic
@ -1441,12 +1421,15 @@ public class StompTest extends StompTestBase {
public void testPrefix(final String prefix, final RoutingType routingType, final boolean send) throws Exception { public void testPrefix(final String prefix, final RoutingType routingType, final boolean send) throws Exception {
int port = 61614; int port = 61614;
URI uri = createStompClientUri(scheme, hostname, port);
final String ADDRESS = UUID.randomUUID().toString(); final String ADDRESS = UUID.randomUUID().toString();
final String PREFIXED_ADDRESS = prefix + ADDRESS; final String PREFIXED_ADDRESS = prefix + ADDRESS;
String param = routingType.toString(); String param = routingType.toString();
String urlParam = param.toLowerCase() + "Prefix"; String urlParam = param.toLowerCase() + "Prefix";
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
// since this queue doesn't exist the broker should create a new address using the routing type matching the prefix // since this queue doesn't exist the broker should create a new address using the routing type matching the prefix
@ -1496,11 +1479,14 @@ public class StompTest extends StompTestBase {
public void testPrefixedSendAndRecieve(final String prefix, RoutingType routingType) throws Exception { public void testPrefixedSendAndRecieve(final String prefix, RoutingType routingType) throws Exception {
int port = 61614; int port = 61614;
URI uri = createStompClientUri(scheme, hostname, port);
final String ADDRESS = UUID.randomUUID().toString(); final String ADDRESS = UUID.randomUUID().toString();
final String PREFIXED_ADDRESS = prefix + ADDRESS; final String PREFIXED_ADDRESS = prefix + ADDRESS;
String urlParam = routingType.toString().toLowerCase() + "Prefix"; String urlParam = routingType.toString().toLowerCase() + "Prefix";
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();

View File

@ -26,7 +26,11 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -62,9 +66,22 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext; import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before; import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public abstract class StompTestBase extends ActiveMQTestBase { public abstract class StompTestBase extends ActiveMQTestBase {
@Parameterized.Parameter
public String scheme;
protected URI uri;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v10.stomp"}, {"tcp+v10.stomp"}});
}
protected String hostname = "127.0.0.1"; protected String hostname = "127.0.0.1";
protected final int port = 61613; protected final int port = 61613;
@ -120,8 +137,13 @@ public abstract class StompTestBase extends ActiveMQTestBase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
uri = new URI(scheme + "://" + hostname + ":" + port);
server = createServer(); server = createServer();
server.start(); server.start();
waitForServerToStart(server.getActiveMQServer());
connectionFactory = createConnectionFactory(); connectionFactory = createConnectionFactory();
((ActiveMQConnectionFactory)connectionFactory).setCompressLargeMessage(isCompressLargeMessages()); ((ActiveMQConnectionFactory)connectionFactory).setCompressLargeMessage(isCompressLargeMessages());
@ -330,7 +352,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String subscriptionId, String subscriptionId,
String ack, String ack,
String durableId) throws IOException, InterruptedException { String durableId) throws IOException, InterruptedException {
return subscribe(conn, subscriptionId, ack, durableId, false); return subscribe(conn, subscriptionId, ack, durableId, true);
} }
public ClientStompFrame subscribe(StompClientConnection conn, public ClientStompFrame subscribe(StompClientConnection conn,
@ -346,7 +368,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String ack, String ack,
String durableId, String durableId,
String selector) throws IOException, InterruptedException { String selector) throws IOException, InterruptedException {
return subscribe(conn, subscriptionId, ack, durableId, selector, false); return subscribe(conn, subscriptionId, ack, durableId, selector, true);
} }
public ClientStompFrame subscribe(StompClientConnection conn, public ClientStompFrame subscribe(StompClientConnection conn,
@ -358,8 +380,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt); return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt);
} }
public void subscribeQueue(StompClientConnection conn, String subId, String destination) throws IOException, InterruptedException { public ClientStompFrame subscribeQueue(StompClientConnection conn, String subId, String destination) throws IOException, InterruptedException {
subscribe(conn, subId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, destination, false); return subscribe(conn, subId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, destination, true);
} }
public ClientStompFrame subscribe(StompClientConnection conn, public ClientStompFrame subscribe(StompClientConnection conn,
@ -384,6 +406,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
if (selector != null) { if (selector != null) {
frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector); frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
} }
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
if (receipt) { if (receipt) {
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
@ -391,6 +414,11 @@ public abstract class StompTestBase extends ActiveMQTestBase {
frame = conn.sendFrame(frame); frame = conn.sendFrame(frame);
// Return Error Frame back to the client
if (frame != null && frame.getCommand().equals("ERROR")) {
return frame;
}
if (receipt) { if (receipt) {
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
} }
@ -402,7 +430,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String subscriptionId, String subscriptionId,
String ack, String ack,
String durableId) throws IOException, InterruptedException { String durableId) throws IOException, InterruptedException {
return subscribeTopic(conn, subscriptionId, ack, durableId, false); return subscribeTopic(conn, subscriptionId, ack, durableId, true);
} }
public ClientStompFrame subscribeTopic(StompClientConnection conn, public ClientStompFrame subscribeTopic(StompClientConnection conn,
@ -441,6 +469,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
frame = conn.sendFrame(frame); frame = conn.sendFrame(frame);
if (frame.getCommand().equals("ERROR")) {
return frame;
}
if (receipt) { if (receipt) {
assertNotNull("Requested receipt, but response is null", frame); assertNotNull("Requested receipt, but response is null", frame);
assertTrue(frame.getHeader(Stomp.Headers.Response.RECEIPT_ID).equals(uuid)); assertTrue(frame.getHeader(Stomp.Headers.Response.RECEIPT_ID).equals(uuid));
@ -536,4 +568,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
return frame; return frame;
} }
public URI createStompClientUri(String scheme, String hostname, int port) throws URISyntaxException {
return new URI(scheme + "://" + hostname + ":" + port);
}
} }

View File

@ -24,13 +24,21 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.felix.resolver.util.ArrayMap; import org.apache.felix.resolver.util.ArrayMap;
import org.junit.Test; import org.junit.Test;
import org.junit.runners.Parameterized;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class StompTestPropertiesInterceptor extends StompTestBase { public class StompTestPropertiesInterceptor extends StompTestBase {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v12.stomp"}, {"tcp+v12.stomp"}});
}
@Override @Override
public List<String> getIncomingInterceptors() { public List<String> getIncomingInterceptors() {
List<String> stompIncomingInterceptor = new ArrayList<>(); List<String> stompIncomingInterceptor = new ArrayList<>();
@ -73,7 +81,7 @@ public class StompTestPropertiesInterceptor extends StompTestBase {
expectedProperties.put(MESSAGE_TEXT, msgText); expectedProperties.put(MESSAGE_TEXT, msgText);
expectedProperties.put(MY_HEADER, myHeader); expectedProperties.put(MY_HEADER, myHeader);
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");

View File

@ -62,7 +62,7 @@ public class StompTestWithInterceptors extends StompTestBase {
// So we clear them here // So we clear them here
MyCoreInterceptor.incomingInterceptedFrames.clear(); MyCoreInterceptor.incomingInterceptedFrames.clear();
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");

View File

@ -16,19 +16,34 @@
*/ */
package org.apache.activemq.artemis.tests.integration.stomp; package org.apache.activemq.artemis.tests.integration.stomp;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Ignore
public class StompTestWithLargeMessages extends StompTestBase { public class StompTestWithLargeMessages extends StompTestBase {
// Web Socket has max frame size of 64kb. Large message tests only available over TCP.
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"tcp+v10.stomp"}, {"tcp+v12.stomp"}});
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -50,10 +65,39 @@ public class StompTestWithLargeMessages extends StompTestBase {
return 2048; return 2048;
} }
@Test
public void testSendReceiveLargeMessage() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
String address = "testLargeMessageAddress";
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);
// STOMP default is UTF-8 == 1 byte per char.
int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
StringBuilder b = new StringBuilder(largeMessageStringSize);
for (int i = 0; i < largeMessageStringSize; i++) {
b.append('t');
}
String payload = b.toString();
// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);
// Send Large Message
System.out.println("Sending Message Size: " + largeMessageStringSize);
send(conn, address, null, payload);
// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
System.out.println(frame.getBody().length());
assertTrue(frame.getBody().equals(payload));
}
//stomp sender -> large -> stomp receiver //stomp sender -> large -> stomp receiver
@Test @Test
public void testSendReceiveLargePersistentMessages() throws Exception { public void testSendReceiveLargePersistentMessages() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
int count = 10; int count = 10;
@ -101,7 +145,7 @@ public class StompTestWithLargeMessages extends StompTestBase {
//core sender -> large -> stomp receiver //core sender -> large -> stomp receiver
@Test @Test
public void testReceiveLargePersistentMessagesFromCore() throws Exception { public void testReceiveLargePersistentMessagesFromCore() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@ -142,103 +186,103 @@ public class StompTestWithLargeMessages extends StompTestBase {
conn.disconnect(); conn.disconnect();
} }
//stomp v12 sender -> large -> stomp v12 receiver // //stomp v12 sender -> large -> stomp v12 receiver
@Test // @Test
public void testSendReceiveLargePersistentMessagesV12() throws Exception { // public void testSendReceiveLargePersistentMessagesV12() throws Exception {
StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); // StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
connV12.connect(defUser, defPass); // connV12.connect(defUser, defPass);
//
int count = 10; // int count = 10;
int szBody = 1024 * 1024; // int szBody = 1024 * 1024;
char[] contents = new char[szBody]; // char[] contents = new char[szBody];
for (int i = 0; i < szBody; i++) { // for (int i = 0; i < szBody; i++) {
contents[i] = 'A'; // contents[i] = 'A';
} // }
String body = new String(contents); // String body = new String(contents);
//
ClientStompFrame frame = connV12.createFrame("SEND"); // ClientStompFrame frame = connV12.createFrame("SEND");
frame.addHeader("destination-type", "ANYCAST"); // frame.addHeader("destination-type", "ANYCAST");
frame.addHeader("destination", getQueuePrefix() + getQueueName()); // frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("persistent", "true"); // frame.addHeader("persistent", "true");
frame.setBody(body); // frame.setBody(body);
//
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
connV12.sendFrame(frame); // connV12.sendFrame(frame);
} // }
//
ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); // ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub"); // subFrame.addHeader("id", "a-sub");
subFrame.addHeader("subscription-type", "ANYCAST"); // subFrame.addHeader("subscription-type", "ANYCAST");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); // subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto"); // subFrame.addHeader("ack", "auto");
//
connV12.sendFrame(subFrame); // connV12.sendFrame(subFrame);
//
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
ClientStompFrame receiveFrame = connV12.receiveFrame(30000); // ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
//
Assert.assertNotNull(receiveFrame); // Assert.assertNotNull(receiveFrame);
System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); // System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); // Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); // Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
assertEquals(szBody, receiveFrame.getBody().length()); // assertEquals(szBody, receiveFrame.getBody().length());
} // }
//
// remove susbcription // // remove susbcription
ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); // ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub"); // unsubFrame.addHeader("id", "a-sub");
connV12.sendFrame(unsubFrame); // connV12.sendFrame(unsubFrame);
//
connV12.disconnect(); // connV12.disconnect();
} // }
//
//core sender -> large -> stomp v12 receiver // //core sender -> large -> stomp v12 receiver
@Test // @Test
public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception { // public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception {
int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; // int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
char[] contents = new char[msgSize]; // char[] contents = new char[msgSize];
for (int i = 0; i < msgSize; i++) { // for (int i = 0; i < msgSize; i++) {
contents[i] = 'B'; // contents[i] = 'B';
} // }
String msg = new String(contents); // String msg = new String(contents);
//
int count = 10; // int count = 10;
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
this.sendJmsMessage(msg); // this.sendJmsMessage(msg);
} // }
//
StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); // StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
connV12.connect(defUser, defPass); // connV12.connect(defUser, defPass);
//
ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); // ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub"); // subFrame.addHeader("id", "a-sub");
subFrame.addHeader("subscription-type", "ANYCAST"); // subFrame.addHeader("subscription-type", "ANYCAST");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); // subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto"); // subFrame.addHeader("ack", "auto");
connV12.sendFrame(subFrame); // connV12.sendFrame(subFrame);
//
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
ClientStompFrame receiveFrame = connV12.receiveFrame(30000); // ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
//
Assert.assertNotNull(receiveFrame); // Assert.assertNotNull(receiveFrame);
System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); // System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); // Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); // Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
assertEquals(msgSize, receiveFrame.getBody().length()); // assertEquals(msgSize, receiveFrame.getBody().length());
} // }
//
// remove susbcription // // remove susbcription
ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); // ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub"); // unsubFrame.addHeader("id", "a-sub");
connV12.sendFrame(unsubFrame); // connV12.sendFrame(unsubFrame);
//
connV12.disconnect(); // connV12.disconnect();
} // }
//core sender -> large (compressed regular) -> stomp v10 receiver //core sender -> large (compressed regular) -> stomp v10 receiver
@Test @Test
public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception { public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
@ -281,98 +325,101 @@ public class StompTestWithLargeMessages extends StompTestBase {
conn.disconnect(); conn.disconnect();
} }
//core sender -> large (compressed regular) -> stomp v12 receiver // //core sender -> large (compressed regular) -> stomp v12 receiver
@Test // @Test
public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception { // public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception {
LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); // LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); // LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
//
char[] contents = input.toArray(); // char[] contents = input.toArray();
String msg = new String(contents); // String msg = new String(contents);
//
int count = 10; // int count = 10;
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
this.sendJmsMessage(msg); // this.sendJmsMessage(msg);
} // }
//
StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); // StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
connV12.connect(defUser, defPass); // connV12.connect(defUser, defPass);
//
ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); // ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub"); // subFrame.addHeader("id", "a-sub");
subFrame.addHeader("subscription-type", "ANYCAST"); // subFrame.addHeader("subscription-type", "ANYCAST");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); // subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto"); // subFrame.addHeader("ack", "auto");
//
connV12.sendFrame(subFrame); // connV12.sendFrame(subFrame);
//
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
ClientStompFrame receiveFrame = connV12.receiveFrame(30000); // ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
//
Assert.assertNotNull(receiveFrame); // Assert.assertNotNull(receiveFrame);
System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); // System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); // Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); // Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
assertEquals(contents.length, receiveFrame.getBody().length()); // assertEquals(contents.length, receiveFrame.getBody().length());
} // }
//
// remove susbcription // // remove susbcription
ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); // ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub"); // unsubFrame.addHeader("id", "a-sub");
connV12.sendFrame(unsubFrame); // connV12.sendFrame(unsubFrame);
//
connV12.disconnect(); // connV12.disconnect();
} // }
//
//core sender -> large (compressed large) -> stomp v12 receiver // //core sender -> large (compressed large) -> stomp v12 receiver
@Test // @Test
public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception { // public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception {
LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); // LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); // input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); // LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
//
char[] contents = input.toArray(); // char[] contents = input.toArray();
String msg = new String(contents); // String msg = new String(contents);
//
int count = 10; // int count = 10;
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
this.sendJmsMessage(msg); // this.sendJmsMessage(msg);
} // }
//
IntegrationTestLogger.LOGGER.info("Message count for " + getQueueName() + ": " + server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(getQueueName())).getMessageCount()); // IntegrationTestLogger.LOGGER.info("Message count for " + getQueueName() + ": " + server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(getQueueName())).getMessageCount());
//
StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); // StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
connV12.connect(defUser, defPass); // connV12.connect(defUser, defPass);
//
ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); // ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub"); // subFrame.addHeader("id", "a-sub");
subFrame.addHeader("subscription-type", "ANYCAST"); // subFrame.addHeader("subscription-type", "ANYCAST");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); // subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto"); // subFrame.addHeader("ack", "auto");
//
connV12.sendFrame(subFrame); // connV12.sendFrame(subFrame);
//
for (int i = 0; i < count; i++) { // for (int i = 0; i < count; i++) {
ClientStompFrame receiveFrame = connV12.receiveFrame(30000); // ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
//
Assert.assertNotNull(receiveFrame); // Assert.assertNotNull(receiveFrame);
System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); // System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); // Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); // Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
assertEquals(contents.length, receiveFrame.getBody().length()); // assertEquals(contents.length, receiveFrame.getBody().length());
} // }
//
// remove susbcription // // remove susbcription
ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); // ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub"); // unsubFrame.addHeader("id", "a-sub");
connV12.sendFrame(unsubFrame); // connV12.sendFrame(unsubFrame);
//
connV12.disconnect(); // connV12.disconnect();
} // }
//core sender -> large (compressed large) -> stomp v10 receiver //core sender -> large (compressed large) -> stomp v10 receiver
@Test @Test
public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception { public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
try {
LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@ -387,7 +434,6 @@ public class StompTestWithLargeMessages extends StompTestBase {
this.sendJmsMessage(msg); this.sendJmsMessage(msg);
} }
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
@ -399,6 +445,7 @@ public class StompTestWithLargeMessages extends StompTestBase {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
ClientStompFrame frame = conn.receiveFrame(60000); ClientStompFrame frame = conn.receiveFrame(60000);
Assert.assertNotNull(frame); Assert.assertNotNull(frame);
System.out.println(frame.toString());
System.out.println("part of frame: " + frame.getBody().substring(0, 250)); System.out.println("part of frame: " + frame.getBody().substring(0, 250));
Assert.assertTrue(frame.getCommand().equals("MESSAGE")); Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName())); Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
@ -410,7 +457,10 @@ public class StompTestWithLargeMessages extends StompTestBase {
unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName()); unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
unsubFrame.addHeader("receipt", "567"); unsubFrame.addHeader("receipt", "567");
conn.sendFrame(unsubFrame); conn.sendFrame(unsubFrame);
} finally {
conn.disconnect(); conn.disconnect();
conn.closeTransport();
}
} }
} }

View File

@ -38,7 +38,7 @@ public class StompTestWithMessageID extends StompTestBase {
@Test @Test
public void testEnableMessageID() throws Exception { public void testEnableMessageID() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
ClientStompFrame frame = conn.createFrame("SEND"); ClientStompFrame frame = conn.createFrame("SEND");
@ -74,5 +74,7 @@ public class StompTestWithMessageID extends StompTestBase {
message = (TextMessage) consumer.receive(2000); message = (TextMessage) consumer.receive(2000);
Assert.assertNull(message); Assert.assertNull(message);
conn.disconnect();
} }
} }

View File

@ -38,7 +38,7 @@ public class StompTestWithSecurity extends StompTestBase {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
ClientStompFrame frame = conn.createFrame("SEND"); ClientStompFrame frame = conn.createFrame("SEND");

View File

@ -16,7 +16,10 @@
*/ */
package org.apache.activemq.artemis.tests.integration.stomp.v11; package org.apache.activemq.artemis.tests.integration.stomp.v11;
import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
@ -26,15 +29,23 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/* /*
* Some Stomp tests against server with persistence enabled are put here. * Some Stomp tests against server with persistence enabled are put here.
*/ */
@RunWith(Parameterized.class)
public class ExtraStompTest extends StompTestBase { public class ExtraStompTest extends StompTestBase {
private StompClientConnection connV10; private StompClientConnection connV10;
private StompClientConnection connV11; private StompClientConnection connV11;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v11.stomp"}, {"tcp+v11.stomp"}});
}
@Override @Override
public boolean isPersistenceEnabled() { public boolean isPersistenceEnabled() {
return true; return true;
@ -44,9 +55,11 @@ public class ExtraStompTest extends StompTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); URI v10Uri = new URI(uri.toString().replace("v11", "v10"));
connV10 = StompClientConnectionFactory.createClientConnection(v10Uri);
connV10.connect(defUser, defPass); connV10.connect(defUser, defPass);
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11 = StompClientConnectionFactory.createClientConnection(uri);
connV11.connect(defUser, defPass); connV11.connect(defUser, defPass);
} }
@ -181,17 +194,19 @@ public class ExtraStompTest extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
subscribe(conn, "a-sub", Stomp.Headers.Subscribe.AckModeValues.CLIENT); frame = subscribe(conn, "a-sub", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
// receive but don't ack // receive but don't ack
frame = conn.receiveFrame(10000); frame = conn.receiveFrame(10000);
System.out.println(frame);
frame = conn.receiveFrame(10000); frame = conn.receiveFrame(10000);
System.out.println(frame);
unsubscribe(conn, "a-sub"); unsubscribe(conn, "a-sub");
subscribe(conn, "a-sub"); frame = subscribe(conn, "a-sub");
frame = conn.receiveFrame(10000);
frame = conn.receiveFrame(10000); frame = conn.receiveFrame(10000);
//second receive will get problem if trailing bytes //second receive will get problem if trailing bytes

View File

@ -23,20 +23,22 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@ -46,10 +48,13 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/* /*
* *
*/ */
@RunWith(Parameterized.class)
public class StompV11Test extends StompTestBase { public class StompV11Test extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@ -57,11 +62,19 @@ public class StompV11Test extends StompTestBase {
private StompClientConnection conn; private StompClientConnection conn;
private URI v10Uri;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v11.stomp"}, {"tcp+v11.stomp"}});
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); v10Uri = new URI(uri.toString().replace("v11", "v10"));
conn = StompClientConnectionFactory.createClientConnection(uri);
} }
@Override @Override
@ -75,13 +88,14 @@ public class StompV11Test extends StompTestBase {
} }
} finally { } finally {
super.tearDown(); super.tearDown();
conn.closeTransport();
} }
} }
@Test @Test
public void testConnection() throws Exception { public void testConnection() throws Exception {
server.getActiveMQServer().getConfiguration().setSecurityEnabled(true); server.getActiveMQServer().getConfiguration().setSecurityEnabled(true);
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection connection = StompClientConnectionFactory.createClientConnection(v10Uri);
connection.connect(defUser, defPass); connection.connect(defUser, defPass);
@ -91,7 +105,7 @@ public class StompV11Test extends StompTestBase {
connection.disconnect(); connection.disconnect();
connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); connection = StompClientConnectionFactory.createClientConnection(uri);
connection.connect(defUser, defPass); connection.connect(defUser, defPass);
@ -101,14 +115,14 @@ public class StompV11Test extends StompTestBase {
connection.disconnect(); connection.disconnect();
connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); connection = StompClientConnectionFactory.createClientConnection(uri);
connection.connect(); connection.connect();
assertFalse(connection.isConnected()); assertFalse(connection.isConnected());
//new way of connection //new way of connection
StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection(uri);
conn.connect1(defUser, defPass); conn.connect1(defUser, defPass);
assertTrue(conn.isConnected()); assertTrue(conn.isConnected());
@ -116,7 +130,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//invalid user //invalid user
conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection(uri);
ClientStompFrame frame = conn.connect("invaliduser", defPass); ClientStompFrame frame = conn.connect("invaliduser", defPass);
assertFalse(conn.isConnected()); assertFalse(conn.isConnected());
assertTrue(Stomp.Responses.ERROR.equals(frame.getCommand())); assertTrue(Stomp.Responses.ERROR.equals(frame.getCommand()));
@ -141,7 +155,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 2 accept-version=1.0, result: 1.0 // case 2 accept-version=1.0, result: 1.0
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0") .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0")
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -158,7 +172,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 3 accept-version=1.1, result: 1.1 // case 3 accept-version=1.1, result: 1.1
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.1") .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.1")
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -175,7 +189,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 4 accept-version=1.0,1.1,1.2, result 1.1 // case 4 accept-version=1.0,1.1,1.2, result 1.1
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1,1.3") .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1,1.3")
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -192,7 +206,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 5 accept-version=1.2, result error // case 5 accept-version=1.2, result error
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.3") .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.3")
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -220,7 +234,7 @@ public class StompV11Test extends StompTestBase {
response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true); response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -254,7 +268,7 @@ public class StompV11Test extends StompTestBase {
send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!"); send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!");
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -289,7 +303,7 @@ public class StompV11Test extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -330,7 +344,7 @@ public class StompV11Test extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -365,6 +379,7 @@ public class StompV11Test extends StompTestBase {
frame.addHeader("destination", getQueuePrefix() + getQueueName()); frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "text/plain"); frame.addHeader("content-type", "text/plain");
frame.addHeader("content-length", cLen); frame.addHeader("content-length", cLen);
//frame.addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0");
String hKey = "undefined-escape"; String hKey = "undefined-escape";
String hVal = "is\\ttab"; String hVal = "is\\ttab";
frame.addHeader(hKey, hVal); frame.addHeader(hKey, hVal);
@ -403,7 +418,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//default heart beat for (0,0) which is default connection TTL (60000) / default heartBeatToTtlModifier (2.0) = 30000 //default heart beat for (0,0) which is default connection TTL (60000) / default heartBeatToTtlModifier (2.0) = 30000
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -424,7 +439,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//heart-beat (1,0), should receive a min client ping accepted by server //heart-beat (1,0), should receive a min client ping accepted by server
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -450,7 +465,7 @@ public class StompV11Test extends StompTestBase {
} }
//heart-beat (1,0), start a ping, then send a message, should be ok. //heart-beat (1,0), start a ping, then send a message, should be ok.
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -499,7 +514,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//heart-beat (500,1000) //heart-beat (500,1000)
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -554,7 +569,7 @@ public class StompV11Test extends StompTestBase {
} }
// subscribe // subscribe
newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -590,7 +605,7 @@ public class StompV11Test extends StompTestBase {
} }
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
try { try {
ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT) ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -647,7 +662,7 @@ public class StompV11Test extends StompTestBase {
} }
// subscribe // subscribe
newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn = StompClientConnectionFactory.createClientConnection(uri);
frame = newConn.createFrame(Stomp.Commands.CONNECT) frame = newConn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -689,8 +704,10 @@ public class StompV11Test extends StompTestBase {
ClientStompFrame reply; ClientStompFrame reply;
int port = 61614; int port = 61614;
uri = createStompClientUri(scheme, hostname, port);
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000&connectionTtlMin=5000&connectionTtlMax=10000").start(); server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000&connectionTtlMin=5000&connectionTtlMax=10000").start();
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); StompClientConnection connection = StompClientConnectionFactory.createClientConnection(uri);
//no heart beat at all if heat-beat absent //no heart beat at all if heat-beat absent
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
@ -709,14 +726,15 @@ public class StompV11Test extends StompTestBase {
assertEquals(0, connection.getFrameQueueSize()); assertEquals(0, connection.getFrameQueueSize());
try { try {
connection.disconnect(); assertFalse(connection.isConnected());
fail("Channel should be closed here already due to TTL");
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} finally {
connection.closeTransport();
} }
//no heart beat for (0,0) //no heart beat for (0,0)
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); connection = StompClientConnectionFactory.createClientConnection(uri);
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -739,14 +757,15 @@ public class StompV11Test extends StompTestBase {
assertEquals(0, connection.getFrameQueueSize()); assertEquals(0, connection.getFrameQueueSize());
try { try {
connection.disconnect(); assertFalse(connection.isConnected());
fail("Channel should be closed here already due to TTL");
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} finally {
connection.closeTransport();
} }
//heart-beat (1,0), should receive a min client ping accepted by server //heart-beat (1,0), should receive a min client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); connection = StompClientConnectionFactory.createClientConnection(uri);
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -765,14 +784,15 @@ public class StompV11Test extends StompTestBase {
//now server side should be disconnected because we didn't send ping for 2 sec //now server side should be disconnected because we didn't send ping for 2 sec
//send will fail //send will fail
try { try {
send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); assertFalse(connection.isConnected());
fail("connection should have been destroyed by now"); } catch (Exception e) {
} catch (IOException e) { // ignore
//ignore } finally {
connection.closeTransport();
} }
//heart-beat (1,0), start a ping, then send a message, should be ok. //heart-beat (1,0), start a ping, then send a message, should be ok.
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); connection = StompClientConnectionFactory.createClientConnection(uri);
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -801,7 +821,7 @@ public class StompV11Test extends StompTestBase {
connection.disconnect(); connection.disconnect();
//heart-beat (20000,0), should receive a max client ping accepted by server //heart-beat (20000,0), should receive a max client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); connection = StompClientConnectionFactory.createClientConnection(uri);
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -820,10 +840,11 @@ public class StompV11Test extends StompTestBase {
//now server side should be disconnected because we didn't send ping for 2 sec //now server side should be disconnected because we didn't send ping for 2 sec
//send will fail //send will fail
try { try {
send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); assertFalse(connection.isConnected());
fail("connection should have been destroyed by now"); } catch (Exception e) {
} catch (IOException e) { // ignore
//ignore } finally {
connection.closeTransport();
} }
} }
@ -836,7 +857,7 @@ public class StompV11Test extends StompTestBase {
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start(); server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); connection = StompClientConnectionFactory.createClientConnection(uri);
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -853,16 +874,15 @@ public class StompV11Test extends StompTestBase {
Thread.sleep(6000); Thread.sleep(6000);
try { try {
connection.disconnect(); assertFalse(connection.isConnected());
fail("Connection should be closed here already due to TTL"); } finally {
} catch (Exception e) { connection.closeTransport();
// ignore
} }
server.getActiveMQServer().getRemotingService().destroyAcceptor("test"); server.getActiveMQServer().getRemotingService().destroyAcceptor("test");
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start(); server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); connection = StompClientConnectionFactory.createClientConnection(uri);
frame = connection.createFrame(Stomp.Commands.CONNECT) frame = connection.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -1151,6 +1171,7 @@ public class StompV11Test extends StompTestBase {
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT); subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
Thread.sleep(1000);
int num = 50; int num = 50;
//send a bunch of messages //send a bunch of messages
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
@ -1175,7 +1196,7 @@ public class StompV11Test extends StompTestBase {
//no messages can be received. //no messages can be received.
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000); Message message = consumer.receive(10000);
Assert.assertNotNull(message); Assert.assertNotNull(message);
message = consumer.receive(1000); message = consumer.receive(1000);
Assert.assertNull(message); Assert.assertNull(message);
@ -1260,21 +1281,21 @@ public class StompV11Test extends StompTestBase {
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass, "myclientid2"); newConn.connect(defUser, defPass, "myclientid2");
this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World"); send(newConn, getTopicPrefix() + getTopicName(), null, "Hello World");
// receive message from socket // receive message from socket
ClientStompFrame frame = conn.receiveFrame(1000); ClientStompFrame frame = conn.receiveFrame(5000);
IntegrationTestLogger.LOGGER.info("received frame : " + frame); IntegrationTestLogger.LOGGER.info("received frame : " + frame);
assertEquals("Hello World", frame.getBody()); assertEquals("Hello World", frame.getBody());
assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
frame = newConn.receiveFrame(1000); frame = newConn.receiveFrame(5000);
IntegrationTestLogger.LOGGER.info("received 2 frame : " + frame); IntegrationTestLogger.LOGGER.info("received 2 frame : " + frame);
assertEquals("Hello World", frame.getBody()); assertEquals("Hello World", frame.getBody());
@ -1294,7 +1315,7 @@ public class StompV11Test extends StompTestBase {
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection(uri);
connV11_2.connect(defUser, defPass); connV11_2.connect(defUser, defPass);
this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
@ -1434,9 +1455,9 @@ public class StompV11Test extends StompTestBase {
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName()); this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName()); this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName(), false);
ClientStompFrame frame = conn.receiveFrame(); ClientStompFrame frame = conn.receiveFrame();
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR)); Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
conn.disconnect(); conn.disconnect();
@ -1463,7 +1484,7 @@ public class StompV11Test extends StompTestBase {
sendJmsMessage(getName(), topic); sendJmsMessage(getName(), topic);
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, CLIENT_ID); conn.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
@ -1488,7 +1509,7 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, CLIENT_ID); conn.connect(defUser, defPass, CLIENT_ID);
this.unsubscribe(conn, getName(), null, false, true); this.unsubscribe(conn, getName(), null, false, true);
@ -1689,6 +1710,7 @@ public class StompV11Test extends StompTestBase {
@Test @Test
public void testSendMessageWithLeadingNewLine() throws Exception { public void testSendMessageWithLeadingNewLine() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
Thread.sleep(1000);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
@ -2151,7 +2173,7 @@ public class StompV11Test extends StompTestBase {
int size = conn.getServerPingNumber(); int size = conn.getServerPingNumber();
conn.stopPinger(); conn.stopPinger();
((AbstractStompClientConnection)conn).killReaderThread(); //((AbstractStompClientConnection)conn).killReaderThread();
Wait.waitFor(() -> { Wait.waitFor(() -> {
return server.getActiveMQServer().getRemotingService().getConnections().size() == 0; return server.getActiveMQServer().getRemotingService().getConnections().size() == 0;
}); });
@ -2175,10 +2197,10 @@ public class StompV11Test extends StompTestBase {
if (sendDisconnect) { if (sendDisconnect) {
conn.disconnect(); conn.disconnect();
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
} else { } else {
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
} }
// message should be received since message was not acknowledged // message should be received since message was not acknowledged
@ -2193,7 +2215,7 @@ public class StompV11Test extends StompTestBase {
// now let's make sure we don't see the message again // now let's make sure we don't see the message again
conn.destroy(); conn.destroy();
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
this.subscribe(conn, "sub1", null, null, true); this.subscribe(conn, "sub1", null, null, true);

View File

@ -24,15 +24,18 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
@ -45,6 +48,7 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runners.Parameterized;
/** /**
* Testing Stomp version 1.2 functionalities * Testing Stomp version 1.2 functionalities
@ -56,11 +60,22 @@ public class StompV12Test extends StompTestBase {
private StompClientConnectionV12 conn; private StompClientConnectionV12 conn;
private URI v10Uri;
private URI v11Uri;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v12.stomp"}, {"tcp+v12.stomp"}});
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); v10Uri = new URI(uri.toString().replace("v12", "v10"));
v11Uri = new URI(uri.toString().replace("v12", "v11"));
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
} }
@Override @Override
@ -74,13 +89,14 @@ public class StompV12Test extends StompTestBase {
} }
} finally { } finally {
super.tearDown(); super.tearDown();
conn.closeTransport();
} }
} }
@Test @Test
public void testConnection() throws Exception { public void testConnection() throws Exception {
server.getActiveMQServer().getConfiguration().setSecurityEnabled(true); server.getActiveMQServer().getConfiguration().setSecurityEnabled(true);
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection connection = StompClientConnectionFactory.createClientConnection(v10Uri);
connection.connect(defUser, defPass); connection.connect(defUser, defPass);
@ -90,7 +106,7 @@ public class StompV12Test extends StompTestBase {
connection.disconnect(); connection.disconnect();
connection = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); connection = StompClientConnectionFactory.createClientConnection(uri);
connection.connect(defUser, defPass); connection.connect(defUser, defPass);
@ -100,14 +116,14 @@ public class StompV12Test extends StompTestBase {
connection.disconnect(); connection.disconnect();
connection = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); connection = StompClientConnectionFactory.createClientConnection(uri);
connection.connect(); connection.connect();
Assert.assertFalse(connection.isConnected()); Assert.assertFalse(connection.isConnected());
//new way of connection //new way of connection
StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection(v11Uri);
conn.connect1(defUser, defPass); conn.connect1(defUser, defPass);
Assert.assertTrue(conn.isConnected()); Assert.assertTrue(conn.isConnected());
@ -117,7 +133,7 @@ public class StompV12Test extends StompTestBase {
@Test @Test
public void testConnectionAsInSpec() throws Exception { public void testConnectionAsInSpec() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(v10Uri);
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT); ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT);
frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser);
@ -133,7 +149,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//need 1.2 client //need 1.2 client
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.STOMP); frame = conn.createFrame(Stomp.Commands.STOMP);
frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser);
@ -151,7 +167,7 @@ public class StompV12Test extends StompTestBase {
@Test @Test
public void testNegotiation() throws Exception { public void testNegotiation() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(v10Uri);
// case 1 accept-version absent. It is a 1.0 connect // case 1 accept-version absent. It is a 1.0 connect
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT); ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT);
frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
@ -168,7 +184,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 2 accept-version=1.0, result: 1.0 // case 2 accept-version=1.0, result: 1.0
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(v11Uri);
frame = conn.createFrame(Stomp.Commands.CONNECT); frame = conn.createFrame(Stomp.Commands.CONNECT);
frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0"); frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0");
frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
@ -185,7 +201,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 3 accept-version=1.1, result: 1.1 // case 3 accept-version=1.1, result: 1.1
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(v11Uri);
frame = conn.createFrame(Stomp.Commands.CONNECT); frame = conn.createFrame(Stomp.Commands.CONNECT);
frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.1"); frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.1");
frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
@ -202,7 +218,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 4 accept-version=1.0,1.1,1.3, result 1.2 // case 4 accept-version=1.0,1.1,1.3, result 1.2
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(v11Uri);
frame = conn.createFrame(Stomp.Commands.CONNECT); frame = conn.createFrame(Stomp.Commands.CONNECT);
frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.3"); frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.3");
frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
@ -219,7 +235,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
// case 5 accept-version=1.3, result error // case 5 accept-version=1.3, result error
conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); conn = StompClientConnectionFactory.createClientConnection(v11Uri);
frame = conn.createFrame(Stomp.Commands.CONNECT); frame = conn.createFrame(Stomp.Commands.CONNECT);
frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.3"); frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.3");
frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
@ -230,6 +246,8 @@ public class StompV12Test extends StompTestBase {
Assert.assertEquals(Stomp.Responses.ERROR, reply.getCommand()); Assert.assertEquals(Stomp.Responses.ERROR, reply.getCommand());
conn.disconnect();
System.out.println("Got error frame " + reply); System.out.println("Got error frame " + reply);
} }
@ -245,7 +263,7 @@ public class StompV12Test extends StompTestBase {
send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true); send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -281,7 +299,7 @@ public class StompV12Test extends StompTestBase {
send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!"); send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!");
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(v11Uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -315,7 +333,7 @@ public class StompV12Test extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -376,7 +394,7 @@ public class StompV12Test extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub", null, null, true); subscribe(newConn, "a-sub", null, null, true);
@ -434,7 +452,7 @@ public class StompV12Test extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -481,7 +499,7 @@ public class StompV12Test extends StompTestBase {
conn.sendFrame(frame); conn.sendFrame(frame);
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -540,7 +558,7 @@ public class StompV12Test extends StompTestBase {
@Test @Test
public void testHeartBeat() throws Exception { public void testHeartBeat() throws Exception {
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
//no heart beat at all if heat-beat absent //no heart beat at all if heat-beat absent
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -558,7 +576,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//no heart beat for (0,0) //no heart beat for (0,0)
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -579,7 +597,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//heart-beat (1,0), should receive a min client ping accepted by server //heart-beat (1,0), should receive a min client ping accepted by server
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -605,7 +623,7 @@ public class StompV12Test extends StompTestBase {
} }
//heart-beat (1,0), start a ping, then send a message, should be ok. //heart-beat (1,0), start a ping, then send a message, should be ok.
conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -650,7 +668,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
//heart-beat (500,1000) //heart-beat (500,1000)
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
frame = conn.createFrame(Stomp.Commands.CONNECT) frame = conn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -703,7 +721,7 @@ public class StompV12Test extends StompTestBase {
} }
// subscribe // subscribe
newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass); newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub"); subscribe(newConn, "a-sub");
@ -738,7 +756,7 @@ public class StompV12Test extends StompTestBase {
} }
//subscribe //subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(v11Uri);
try { try {
ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT) ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
@ -795,7 +813,7 @@ public class StompV12Test extends StompTestBase {
} }
// subscribe // subscribe
newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn = StompClientConnectionFactory.createClientConnection(uri);
frame = newConn.createFrame(Stomp.Commands.CONNECT) frame = newConn.createFrame(Stomp.Commands.CONNECT)
.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
@ -1250,7 +1268,7 @@ public class StompV12Test extends StompTestBase {
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(v11Uri);
newConn.connect(defUser, defPass, "myclientid2"); newConn.connect(defUser, defPass, "myclientid2");
this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
@ -1284,7 +1302,7 @@ public class StompV12Test extends StompTestBase {
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
StompClientConnection connV12_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); StompClientConnection connV12_2 = StompClientConnectionFactory.createClientConnection(v11Uri);
connV12_2.connect(defUser, defPass); connV12_2.connect(defUser, defPass);
this.subscribe(connV12_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); this.subscribe(connV12_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
@ -1423,9 +1441,8 @@ public class StompV12Test extends StompTestBase {
this.subscribe(conn, "sub1", "client", getName()); this.subscribe(conn, "sub1", "client", getName());
this.subscribe(conn, "sub1", "client", getName()); ClientStompFrame frame = this.subscribe(conn, "sub1", "client", getName());
ClientStompFrame frame = conn.receiveFrame();
Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR)); Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
waitDisconnect(conn); waitDisconnect(conn);
@ -1451,7 +1468,7 @@ public class StompV12Test extends StompTestBase {
sendJmsMessage(getName(), topic); sendJmsMessage(getName(), topic);
conn.destroy(); conn.destroy();
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, CLIENT_ID); conn.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
@ -1476,7 +1493,7 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
conn.destroy(); conn.destroy();
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, CLIENT_ID); conn.connect(defUser, defPass, CLIENT_ID);
this.unsubscribe(conn, getName(), null, false, true); this.unsubscribe(conn, getName(), null, false, true);
@ -2131,7 +2148,7 @@ public class StompV12Test extends StompTestBase {
sendJmsMessage("second message"); sendJmsMessage("second message");
//reconnect //reconnect
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
frame = conn.receiveFrame(1000); frame = conn.receiveFrame(1000);
@ -2172,10 +2189,10 @@ public class StompV12Test extends StompTestBase {
if (sendDisconnect) { if (sendDisconnect) {
conn.disconnect(); conn.disconnect();
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
} else { } else {
conn.destroy(); conn.destroy();
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
} }
// message should be received since message was not acknowledged // message should be received since message was not acknowledged
@ -2190,7 +2207,7 @@ public class StompV12Test extends StompTestBase {
// now let's make sure we don't see the message again // now let's make sure we don't see the message again
conn.destroy(); conn.destroy();
conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass); conn.connect(defUser, defPass);
this.subscribe(conn, "sub1", null, null, true); this.subscribe(conn, "sub1", null, null, true);