Cleanup the STOMP tests framework and shorten the test duration by only
using resources in each test that are actually needed, also fixes some
issues with tests that ran a long time just waiting for an end signal.
This commit is contained in:
Timothy Bish 2015-05-20 19:22:43 -04:00
parent e99c814830
commit 540a66baa1
28 changed files with 359 additions and 648 deletions

View File

@ -39,9 +39,11 @@ import org.slf4j.LoggerFactory;
// https://issues.apache.org/jira/browse/AMQ-3393
public class ConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
private BrokerService brokerService;
private final Vector<Throwable> exceptions = new Vector<Throwable>();
@Before
public void startBroker() throws Exception {

View File

@ -22,25 +22,16 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class Stomp11NIOSSLTest extends Stomp11Test {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override

View File

@ -19,14 +19,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class Stomp11NIOTest extends Stomp11Test {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override

View File

@ -22,34 +22,21 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class Stomp11SslAuthTest extends Stomp11Test {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
//System.setProperty("javax.net.debug","ssl,handshake");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"ssl://0.0.0.0:0?needClientAuth=true");
jmsUri = connector.getPublishableConnectString();
protected boolean isUseSslConnector() {
return true;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
sslPort = connector.getConnectUri().getPort();
protected String getAdditionalConfig() {
return "?needClientAuth=true";
}
@Override

View File

@ -34,7 +34,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -64,12 +63,6 @@ public class Stomp11Test extends StompTestSupport {
connection.start();
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Test(timeout = 60000)
public void testConnect() throws Exception {
@ -89,9 +82,6 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.indexOf("response-id:1") >= 0);
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -119,9 +109,6 @@ public class Stomp11Test extends StompTestSupport {
LOG.info("session header follows: " + f);
assertTrue(f.startsWith("ID:"));
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -141,9 +128,6 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -163,9 +147,6 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.0") >= 0);
assertTrue(f.indexOf("session:") >= 0);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -226,9 +207,6 @@ public class Stomp11Test extends StompTestSupport {
long endTime = System.currentTimeMillis();
assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
}
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -308,9 +286,6 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(stompFrame.getAction().equals("MESSAGE"));
service.shutdownNow();
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -452,9 +427,6 @@ public class Stomp11Test extends StompTestSupport {
fail("No message should have been received since subscription was removed");
} catch (SocketTimeoutException e) {
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -479,9 +451,6 @@ public class Stomp11Test extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -512,9 +481,6 @@ public class Stomp11Test extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -551,9 +517,6 @@ public class Stomp11Test extends StompTestSupport {
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -596,9 +559,6 @@ public class Stomp11Test extends StompTestSupport {
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -634,9 +594,6 @@ public class Stomp11Test extends StompTestSupport {
received = stompConnection.receive();
assertNotNull(received);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -702,9 +659,6 @@ public class Stomp11Test extends StompTestSupport {
}
stompConnection.sendFrame(unsub);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -736,9 +690,6 @@ public class Stomp11Test extends StompTestSupport {
assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message;
assertEquals("GroupID", "abc", amqMessage.getGroupID());
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -766,9 +717,6 @@ public class Stomp11Test extends StompTestSupport {
assertNotNull(message);
assertEquals("Hello World", message.getText());
assertEquals("newest", message.getStringProperty("value"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -795,9 +743,6 @@ public class Stomp11Test extends StompTestSupport {
final String expectedEncoded = "\\\\value\\c";
final String headerVal = frame.substring(start, start + expectedEncoded.length());
assertEquals("" + frame, expectedEncoded, headerVal);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -851,9 +796,6 @@ public class Stomp11Test extends StompTestSupport {
frame = "UNSUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -894,9 +836,6 @@ public class Stomp11Test extends StompTestSupport {
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -988,9 +927,6 @@ public class Stomp11Test extends StompTestSupport {
assertEquals("4", receipt.getHeaders().get("receipt-id"));
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -1064,9 +1000,6 @@ public class Stomp11Test extends StompTestSupport {
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
assertEquals(view.getDurableTopicSubscribers().length, 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -1182,8 +1115,5 @@ public class Stomp11Test extends StompTestSupport {
assertEquals(view.getDurableTopicSubscribers().length, 2);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
}

View File

@ -22,25 +22,16 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class Stomp12NIOSSLTest extends Stomp12Test {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override

View File

@ -19,14 +19,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class Stomp12NIOTest extends Stomp12Test {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override

View File

@ -22,34 +22,21 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class Stomp12SslAuthTest extends Stomp12Test {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
//System.setProperty("javax.net.debug","ssl,handshake");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"ssl://0.0.0.0:0?needClientAuth=true");
jmsUri = connector.getPublishableConnectString();
protected boolean isUseSslConnector() {
return true;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
sslPort = connector.getConnectUri().getPort();
protected String getAdditionalConfig() {
return "?needClientAuth=true";
}
@Override

View File

@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.util.Wait;
import org.junit.Test;
@ -50,9 +49,12 @@ public class Stomp12Test extends StompTestSupport {
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
public void tearDown() throws Exception {
try {
connection.close();
} catch (Exception ex) {}
super.tearDown();
}
@Override
@ -65,7 +67,7 @@ public class Stomp12Test extends StompTestSupport {
return getClass().getName() + "." + getName();
}
@Test
@Test(timeout = 60000)
public void testTelnetStyleSends() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
@ -100,12 +102,9 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
@Test(timeout = 60000)
public void testClientAckWithoutAckId() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
@ -157,12 +156,9 @@ public class Stomp12Test extends StompTestSupport {
received = stompConnection.receive();
assertTrue(received.getAction().equals("ERROR"));
LOG.info("Broker sent: " + received);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
@Test(timeout = 60000)
public void testClientAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
@ -258,12 +254,9 @@ public class Stomp12Test extends StompTestSupport {
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
@Test(timeout = 60000)
public void testClientIndividualAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
@ -364,12 +357,9 @@ public class Stomp12Test extends StompTestSupport {
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(disconnect);
}
@Test
@Test(timeout = 60000)
public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10;
@ -432,12 +422,9 @@ public class Stomp12Test extends StompTestSupport {
}
stompConnection.sendFrame(unsub);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testQueueBrowerNotInAutoAckMode() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
@ -467,9 +454,6 @@ public class Stomp12Test extends StompTestSupport {
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -534,9 +518,6 @@ public class Stomp12Test extends StompTestSupport {
assertEquals("3", receipt.getHeaders().get("receipt-id"));
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
@ -561,8 +542,5 @@ public class Stomp12Test extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
}

View File

@ -51,6 +51,15 @@ public class StompAdvisoryTest extends StompTestSupport {
protected ActiveMQConnection connection;
@Override
public void tearDown() throws Exception {
try {
connection.close();
} catch (Exception ex) {}
super.tearDown();
}
@Override
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
plugins.add(new StatisticsBrokerPlugin());
@ -78,9 +87,10 @@ public class StompAdvisoryTest extends StompTestSupport {
brokerService.setAdvisorySupport(true);
}
@Test
@Test(timeout = 60000)
public void testConnectionAdvisory() throws Exception {
stompConnect();
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO);
@ -105,8 +115,9 @@ public class StompAdvisoryTest extends StompTestSupport {
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
}
@Test
@Test(timeout = 60000)
public void testConnectionAdvisoryJSON() throws Exception {
stompConnect();
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_JSON.toString());
@ -136,8 +147,9 @@ public class StompAdvisoryTest extends StompTestSupport {
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
}
@Test
@Test(timeout = 60000)
public void testConnectionAdvisoryXML() throws Exception {
stompConnect();
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_XML.toString());
@ -167,8 +179,9 @@ public class StompAdvisoryTest extends StompTestSupport {
assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
}
@Test
@Test(timeout = 60000)
public void testConsumerAdvisory() throws Exception {
stompConnect();
Destination dest = new ActiveMQQueue("testConsumerAdvisory");
@ -193,8 +206,9 @@ public class StompAdvisoryTest extends StompTestSupport {
c.close();
}
@Test
@Test(timeout = 60000)
public void testProducerAdvisory() throws Exception {
stompConnect();
Destination dest = new ActiveMQQueue("testProducerAdvisory");
@ -220,8 +234,9 @@ public class StompAdvisoryTest extends StompTestSupport {
c.close();
}
@Test
@Test(timeout = 60000)
public void testProducerAdvisoryXML() throws Exception {
stompConnect();
Destination dest = new ActiveMQQueue("testProducerAdvisoryXML");
@ -251,8 +266,9 @@ public class StompAdvisoryTest extends StompTestSupport {
c.close();
}
@Test
@Test(timeout = 60000)
public void testProducerAdvisoryJSON() throws Exception {
stompConnect();
Destination dest = new ActiveMQQueue("testProducerAdvisoryJSON");
@ -282,7 +298,7 @@ public class StompAdvisoryTest extends StompTestSupport {
c.close();
}
@Test
@Test(timeout = 60000)
public void testStatisticsAdvisory() throws Exception {
Connection c = cf.createConnection("system", "manager");
c.start();
@ -312,6 +328,7 @@ public class StompAdvisoryTest extends StompTestSupport {
});
child.start();
stompConnect();
// Attempt to gather the statistics response from the previous request.
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/" + replyTo.getTopicName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);

View File

@ -20,7 +20,9 @@ package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -28,11 +30,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
@ -45,21 +42,11 @@ public class StompLoadTest extends StompTestSupport {
private static final int TASK_COUNT = 100;
private static final int MSG_COUNT = 250; // AMQ-3819: Above 250 or so and the CPU goes bonkers with NOI+SSL.
protected Connection connection;
protected Session session;
protected ActiveMQQueue queue;
private ExecutorService executor;
private CountDownLatch started;
private CountDownLatch ready;
private AtomicInteger receiveCount;
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Override
public void setUp() throws Exception {
@ -68,11 +55,6 @@ public class StompLoadTest extends StompTestSupport {
stompConnect();
stompConnection.connect("system", "manager");
connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getTopicName());
connection.start();
executor = Executors.newFixedThreadPool(TASK_COUNT, new ThreadFactory() {
private long i = 0;
@ -94,7 +76,6 @@ public class StompLoadTest extends StompTestSupport {
public void tearDown() throws Exception {
try {
executor.shutdownNow();
connection.close();
} catch (Exception e) {
} finally {
super.tearDown();
@ -104,6 +85,8 @@ public class StompLoadTest extends StompTestSupport {
@Test(timeout=5*60*1000)
public void testStompUnloadLoad() throws Exception {
final List<StompConnection> taskConnections = new ArrayList<>();
for (int i = 0; i < TASK_COUNT; ++i) {
executor.execute(new Runnable() {
@ -122,6 +105,8 @@ public class StompLoadTest extends StompTestSupport {
LOG.error("Caught Exception while connecting: " + e.getMessage());
}
taskConnections.add(connection);
try {
for (int i = 0; i < 10; i++) {
@ -139,7 +124,7 @@ public class StompLoadTest extends StompTestSupport {
TimeUnit.SECONDS.sleep(3);
started.countDown();
while (true) {
while (receiveCount.get() != TASK_COUNT * MSG_COUNT) {
// Read Timeout ends this task, we override the default here since there
// are so many threads running and we don't know how slow the test box is.
StompFrame frame = connection.receive(TimeUnit.SECONDS.toMillis(60));
@ -190,10 +175,17 @@ public class StompLoadTest extends StompTestSupport {
LOG.info("Test Completed and all messages received, shutting down.");
for (StompConnection taskConnection : taskConnections) {
try {
taskConnection.disconnect();
taskConnection.close();
} catch (Exception ex) {
}
}
executor.shutdown();
executor.awaitTermination(2, TimeUnit.MINUTES);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
LOG.info("Test Finished.");
stompDisconnect();
}
}

View File

@ -27,7 +27,6 @@ import java.util.Arrays;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Test;
public class StompMaxDataSizeTest extends StompTestSupport {
@ -36,17 +35,6 @@ public class StompMaxDataSizeTest extends StompTestSupport {
private StompConnection connection;
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
}
@Override
public void tearDown() throws Exception {
if (connection != null) {
@ -58,21 +46,23 @@ public class StompMaxDataSizeTest extends StompTestSupport {
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = null;
protected boolean isUseSslConnector() {
return true;
}
connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+ sslPort +
"?transport.maxDataLength=" + TEST_MAX_DATA_SIZE);
sslPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp://0.0.0.0:" + port +
"?transport.maxDataLength=" + TEST_MAX_DATA_SIZE);
port = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio://0.0.0.0:" + nioPort +
"?transport.maxDataLength=" + TEST_MAX_DATA_SIZE);
nioPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:" + nioSslPort +
"?transport.maxDataLength=" + TEST_MAX_DATA_SIZE);
nioSslPort = connector.getConnectUri().getPort();
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override
protected String getAdditionalConfig() {
return "?transport.maxDataLength=" + TEST_MAX_DATA_SIZE;
}
@Test(timeout = 60000)

View File

@ -28,7 +28,6 @@ import java.util.Collection;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -66,32 +65,23 @@ public class StompMaxFrameSizeTest extends StompTestSupport {
}
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseSslConnector() {
return true;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = null;
protected boolean isUseNioConnector() {
return true;
}
connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+ sslPort +
"?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize);
sslPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp://0.0.0.0:" + port +
"?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize);
port = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio://0.0.0.0:" + nioPort +
"?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize);
nioPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:" + nioSslPort +
"?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize);
nioSslPort = connector.getConnectUri().getPort();
@Override
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override
protected String getAdditionalConfig() {
return "?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize;
}
/**

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.UUID;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,13 +38,7 @@ public class StompMissingMessageTest extends StompTestSupport {
destination = "/topic/" + getTopicName();
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Test
@Test(timeout = 60000)
public void testProducerConsumerLoop() throws Exception {
final int ITERATIONS = 500;
int received = 0;
@ -88,7 +81,7 @@ public class StompMissingMessageTest extends StompTestSupport {
return message;
}
@Test
@Test(timeout = 60000)
public void testProducerDurableConsumerLoop() throws Exception {
final int ITERATIONS = 500;
int received = 0;

View File

@ -20,14 +20,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOLoadTest extends StompLoadTest {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override

View File

@ -23,25 +23,16 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOSSLLoadTest extends StompLoadTest {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override

View File

@ -22,25 +22,16 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOSSLTest extends StompTest {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override

View File

@ -19,14 +19,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOTest extends StompTest {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override

View File

@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompPrefetchTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompPrefetchTest.class);
@Override
@ -57,7 +58,7 @@ public class StompPrefetchTest extends StompTestSupport {
@Test(timeout = 60000)
public void testTopicSubPrefetch() throws Exception {
stompConnect();
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/T", Stomp.Headers.Subscribe.AckModeValues.AUTO);
@ -66,6 +67,7 @@ public class StompPrefetchTest extends StompTestSupport {
@Test(timeout = 60000)
public void testDurableSubPrefetch() throws Exception {
stompConnect();
stompConnection.connect("system", "manager");
HashMap<String,String> headers = new HashMap<String, String>();
headers.put("id", "durablesub");
@ -83,6 +85,7 @@ public class StompPrefetchTest extends StompTestSupport {
headers.put("browser", "true");
headers.put("accept-version","1.1");
stompConnect();
stompConnection.connect(headers);
stompConnection.subscribe("/queue/Q", Stomp.Headers.Subscribe.AckModeValues.AUTO, headers);
@ -91,6 +94,7 @@ public class StompPrefetchTest extends StompTestSupport {
@Test(timeout = 60000)
public void testQueueSubPrefetch() throws Exception {
stompConnect();
stompConnection.connect("system", "manager");
stompConnection.subscribe("/queue/Q", Stomp.Headers.Subscribe.AckModeValues.AUTO);

View File

@ -23,25 +23,16 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class StompSSLLoadTest extends StompLoadTest {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
protected boolean isUseSslConnector() {
return true;
}
@Override

View File

@ -22,6 +22,7 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.security.JaasCertificateAuthenticationPlugin;
@ -29,16 +30,13 @@ import org.apache.activemq.security.JaasCertificateAuthenticationPlugin;
public class StompSslAuthTest extends StompTest {
@Override
public void setUp() throws Exception {
protected boolean isUseTcpConnector() {
return false;
}
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
//System.setProperty("javax.net.debug","ssl,handshake");
super.setUp();
@Override
protected boolean isUseSslConnector() {
return true;
}
@Override
@ -55,17 +53,14 @@ public class StompSslAuthTest extends StompTest {
}
@Override
protected void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"ssl://0.0.0.0:0?needClientAuth=true");
jmsUri = connector.getPublishableConnectString();
public void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("ssl://0.0.0.0:0?needClientAuth=true");
cf = new ActiveMQConnectionFactory(connector.getPublishableConnectString());
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
sslPort = connector.getConnectUri().getPort();
protected String getAdditionalConfig() {
return "?needClientAuth=true";
}
// NOOP - These operations handled by jaas cert login module

View File

@ -22,25 +22,16 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class StompSslTest extends StompTest {
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.setUp();
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
protected boolean isUseSslConnector() {
return true;
}
@Override

View File

@ -40,9 +40,9 @@ public class StompSubscriptionRemoveTest extends StompTestSupport {
private static final String COMMAND_MESSAGE = "MESSAGE";
private static final String HEADER_MESSAGE_ID = "message-id";
@Test
@Test(timeout = 60000)
public void testRemoveSubscriber() throws Exception {
stompConnect();
Connection connection = cf.createConnection("system", "manager");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue(getQueueName()));
@ -76,11 +76,8 @@ public class StompSubscriptionRemoveTest extends StompTestSupport {
++count;
}
stompConnection.sendFrame("DISCONNECT\n\n");
Thread.sleep(1000);
stompConnection.close();
stompDisconnect();
Thread.sleep(1000);
stompConnect();
connectFrame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;

View File

@ -31,7 +31,7 @@ public class StompTelnetTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTelnetTest.class);
@Test
@Test(timeout = 60000)
public void testCRLF() throws Exception {
for (TransportConnector connector : brokerService.getTransportConnectors()) {
@ -50,7 +50,7 @@ public class StompTelnetTest extends StompTestSupport {
}
}
@Test
@Test(timeout = 60000)
public void testCRLF11() throws Exception {
for (TransportConnector connector : brokerService.getTransportConnectors()) {
@ -79,14 +79,6 @@ public class StompTelnetTest extends StompTestSupport {
return null;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
}
protected Socket createSocket(int port) throws IOException {
return new Socket("127.0.0.1", port);
}

View File

@ -45,7 +45,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -154,12 +153,6 @@ public class StompTest extends StompTestSupport {
}
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
public void sendMessage(String msg) throws Exception {
sendMessage(msg, "foo", "xyz");
}
@ -178,7 +171,7 @@ public class StompTest extends StompTestSupport {
producer.send(message);
}
@Test
@Test(timeout = 60000)
public void testConnect() throws Exception {
String connectFrame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "request-id:1\n" + "\n" + Stomp.NULL;
@ -189,7 +182,7 @@ public class StompTest extends StompTestSupport {
assertTrue(f.indexOf("response-id:1") >= 0);
}
@Test
@Test(timeout = 60000)
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -215,7 +208,7 @@ public class StompTest extends StompTestSupport {
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
@Test
@Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -235,7 +228,7 @@ public class StompTest extends StompTestSupport {
assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID());
}
@Test
@Test(timeout = 60000)
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
@ -257,7 +250,7 @@ public class StompTest extends StompTestSupport {
assertEquals("bar", "123", message.getStringProperty("bar"));
}
@Test
@Test(timeout = 60000)
public void testSendMessageWithDelay() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -278,7 +271,7 @@ public class StompTest extends StompTestSupport {
assertNotNull(message);
}
@Test
@Test(timeout = 60000)
public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -308,7 +301,7 @@ public class StompTest extends StompTestSupport {
assertEquals("GroupID", "abc", amqMessage.getGroupID());
}
@Test
@Test(timeout = 60000)
public void testSendMessageWithNoPriorityReceivesDefault() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -330,7 +323,7 @@ public class StompTest extends StompTestSupport {
assertEquals("getJMSPriority", 4, message.getJMSPriority());
}
@Test
@Test(timeout = 60000)
public void testReceipts() throws Exception {
StompConnection receiver = new StompConnection();
@ -380,12 +373,9 @@ public class StompTest extends StompTestSupport {
TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message);
assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscriptionReceipts() throws Exception {
final int done = 20;
int count = 0;
@ -435,7 +425,7 @@ public class StompTest extends StompTestSupport {
} while (count < done);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithAutoAck() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -451,12 +441,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -481,12 +468,9 @@ public class StompTest extends StompTestSupport {
assertEquals("5", clMmatcher.group(1));
assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testBytesMessageWithNulls() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -508,12 +492,9 @@ public class StompTest extends StompTestSupport {
assertEquals("5", length);
assertEquals(5, message.getContent().length);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSendMultipleBytesMessages() throws Exception {
final int MSG_COUNT = 50;
@ -541,12 +522,9 @@ public class StompTest extends StompTestSupport {
assertEquals(5, message.getContent().length);
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithMessageSentWithProperties() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -572,12 +550,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
@ -613,12 +588,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -636,12 +608,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndNumericSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -664,12 +633,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndBooleanSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -692,12 +658,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAnFloatSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -720,12 +683,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithClientAck() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -750,7 +710,7 @@ public class StompTest extends StompTestSupport {
assertTrue(message.getJMSRedelivered());
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithClientAckedAndContentLength() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -783,9 +743,6 @@ public class StompTest extends StompTestSupport {
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompDisconnect();
// message should not be received since it was acknowledged
@ -794,7 +751,7 @@ public class StompTest extends StompTestSupport {
assertNull(message);
}
@Test
@Test(timeout = 60000)
public void testUnsubscribe() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -830,7 +787,7 @@ public class StompTest extends StompTestSupport {
}
}
@Test
@Test(timeout = 60000)
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -853,7 +810,7 @@ public class StompTest extends StompTestSupport {
assertNotNull("Should have received a message", message);
}
@Test
@Test(timeout = 60000)
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -888,7 +845,7 @@ public class StompTest extends StompTestSupport {
assertEquals("second message", message.getText().trim());
}
@Test
@Test(timeout = 60000)
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
assertClients(1);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -903,7 +860,7 @@ public class StompTest extends StompTestSupport {
assertClients(1);
}
@Test
@Test(timeout = 60000)
public void testConnectNotAuthenticatedWrongUser() throws Exception {
String frame = "CONNECT\n" + "login: dejanb\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -914,7 +871,7 @@ public class StompTest extends StompTestSupport {
assertClients(1);
}
@Test
@Test(timeout = 60000)
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
@ -926,7 +883,7 @@ public class StompTest extends StompTestSupport {
assertClients(1);
}
@Test
@Test(timeout = 60000)
public void testSendNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
@ -942,7 +899,7 @@ public class StompTest extends StompTestSupport {
assertTrue(f.startsWith("ERROR"));
}
@Test
@Test(timeout = 60000)
public void testSubscribeNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
@ -958,7 +915,7 @@ public class StompTest extends StompTestSupport {
assertTrue(frame.startsWith("ERROR"));
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithReceiptNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
@ -976,7 +933,7 @@ public class StompTest extends StompTestSupport {
assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
}
@Test
@Test(timeout = 60000)
public void testSubscribeWithInvalidSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -990,12 +947,9 @@ public class StompTest extends StompTestSupport {
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationUnknownTranslator() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1014,7 +968,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Hello World", message.getText());
}
@Test
@Test(timeout = 60000)
public void testTransformationFailed() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1034,7 +988,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Hello World", message.getText());
}
@Test
@Test(timeout = 60000)
public void testTransformationSendXMLObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1059,7 +1013,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Dejan", object.getName());
}
@Test
@Test(timeout = 60000)
public void testTransformationSendJSONObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1079,7 +1033,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Dejan", object.getName());
}
@Test
@Test(timeout = 60000)
public void testTransformationSubscribeXML() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1098,12 +1052,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveJSONObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1121,12 +1072,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveXMLObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1145,12 +1093,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1169,12 +1114,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveXMLObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1207,12 +1149,9 @@ public class StompTest extends StompTestSupport {
assertTrue(map.get("name").equals("Dejan"));
assertTrue(map.get("city").equals("Belgrade"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveJSONObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1249,12 +1188,9 @@ public class StompTest extends StompTestSupport {
assertTrue(map.get("name").equals("Dejan"));
assertTrue(map.get("city").equals("Belgrade"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationSendAndReceiveXmlMap() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1284,7 +1220,7 @@ public class StompTest extends StompTestSupport {
assertTrue(xmlFrame.getHeaders().containsValue("jms-map-xml"));
}
@Test
@Test(timeout = 60000)
public void testTransformationSendAndReceiveJsonMap() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1315,7 +1251,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Belgrade", map.get("city"));
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveBytesMessage() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1341,12 +1277,9 @@ public class StompTest extends StompTestSupport {
assertEquals("5", clMmatcher.group(1));
assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationNotOverrideSubscription() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1365,12 +1298,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationIgnoreTransformation() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1389,12 +1319,9 @@ public class StompTest extends StompTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.endsWith("\n\n"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationSendXMLMap() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1413,7 +1340,7 @@ public class StompTest extends StompTestSupport {
assertEquals(message.getString("name"), "Dejan");
}
@Test
@Test(timeout = 60000)
public void testTransformationSendJSONMap() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1432,7 +1359,7 @@ public class StompTest extends StompTestSupport {
assertEquals(message.getString("name"), "Dejan");
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveXMLMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1460,12 +1387,9 @@ public class StompTest extends StompTestSupport {
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransformationReceiveJSONMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1493,12 +1417,9 @@ public class StompTest extends StompTestSupport {
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testDurableUnsub() throws Exception {
// get broker JMX view
@ -1566,7 +1487,7 @@ public class StompTest extends StompTestSupport {
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
}
@Test
@Test(timeout = 60000)
public void testDurableSubAttemptOnQueueFails() throws Exception {
// get broker JMX view
@ -1592,12 +1513,9 @@ public class StompTest extends StompTestSupport {
assertTrue(frame.startsWith("ERROR"));
assertEquals(view.getQueueSubscribers().length, 0);
// disconnect
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testMessageIdHeader() throws Exception {
stompConnection.connect("system", "manager");
@ -1610,7 +1528,7 @@ public class StompTest extends StompTestSupport {
assertNull(stompMessage.getHeaders().get("transaction"));
}
@Test
@Test(timeout = 60000)
public void testPrefetchSizeOfOneClientAck() throws Exception {
stompConnection.connect("system", "manager");
@ -1690,7 +1608,7 @@ public class StompTest extends StompTestSupport {
} catch (SocketTimeoutException soe) {}
}
@Test
@Test(timeout = 60000)
public void testPrefetchSize() throws Exception {
stompConnection.connect("system", "manager");
@ -1750,7 +1668,7 @@ public class StompTest extends StompTestSupport {
stompDisconnect();
}
@Test
@Test(timeout = 60000)
public void testTransactionsWithMultipleDestinations() throws Exception {
stompConnection.connect("system", "manager");
@ -1775,11 +1693,9 @@ public class StompTest extends StompTestSupport {
StompFrame frame = stompConnection.receive(500);
assertNotNull(frame);
stompConnection.disconnect();
}
@Test
@Test(timeout = 60000)
public void testTempDestination() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1798,7 +1714,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Hello World", message.getBody());
}
@Test
@Test(timeout = 60000)
public void testJMSXUserIDIsSetInMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1818,7 +1734,7 @@ public class StompTest extends StompTestSupport {
assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
}
@Test
@Test(timeout = 60000)
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1837,7 +1753,7 @@ public class StompTest extends StompTestSupport {
assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID));
}
@Test
@Test(timeout = 60000)
public void testClientSetMessageIdIsIgnored() throws Exception {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Message.MESSAGE_ID, "Thisisnotallowed");
@ -1864,7 +1780,7 @@ public class StompTest extends StompTestSupport {
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
}
@Test
@Test(timeout = 60000)
public void testExpire() throws Exception {
stompConnection.connect("system", "manager");
@ -1881,7 +1797,7 @@ public class StompTest extends StompTestSupport {
assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.ORIGINAL_DESTINATION), "/queue/" + getQueueName());
}
@Test
@Test(timeout = 60000)
public void testDefaultJMSReplyToDest() throws Exception {
stompConnection.connect("system", "manager");
@ -1897,7 +1813,7 @@ public class StompTest extends StompTestSupport {
assertEquals("" + stompMessage, stompMessage.getHeaders().get(Stomp.Headers.Send.REPLY_TO), "JustAString");
}
@Test
@Test(timeout = 60000)
public void testPersistent() throws Exception {
stompConnection.connect("system", "manager");
@ -1914,7 +1830,7 @@ public class StompTest extends StompTestSupport {
assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT), "true");
}
@Test
@Test(timeout = 60000)
public void testPersistentDefaultValue() throws Exception {
stompConnection.connect("system", "manager");
@ -1929,7 +1845,7 @@ public class StompTest extends StompTestSupport {
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
}
@Test
@Test(timeout = 60000)
public void testReceiptNewQueue() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1961,12 +1877,9 @@ public class StompTest extends StompTestSupport {
String length = message.getHeaders().get("content-length");
assertEquals("0", length);
assertEquals(0, message.getContent().length);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testTransactedClientAckBrokerStats() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -2007,7 +1920,7 @@ public class StompTest extends StompTestSupport {
assertEquals(0, queueView.getQueueSize());
}
@Test
@Test(timeout = 60000)
public void testReplytoModification() throws Exception {
String replyto = "some destination";
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -2025,11 +1938,9 @@ public class StompTest extends StompTestSupport {
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().equals("MESSAGE"));
assertEquals(replyto, message.getHeaders().get("reply-to"));
stompConnection.sendFrame("DISCONNECT\n" + "\n\n" + Stomp.NULL);
}
@Test
@Test(timeout = 60000)
public void testReplyToDestinationNaming() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -2042,7 +1953,7 @@ public class StompTest extends StompTestSupport {
doTestActiveMQReplyToTempDestination("queue");
}
@Test
@Test(timeout = 60000)
public void testSendNullBodyTextMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -2056,9 +1967,6 @@ public class StompTest extends StompTestSupport {
sendMessage(null);
frame = stompConnection.receiveFrame();
assertNotNull("Message not received", frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
private void doTestActiveMQReplyToTempDestination(String type) throws Exception {
@ -2103,7 +2011,7 @@ public class StompTest extends StompTestSupport {
}
}
@Test
@Test(timeout = 60000)
public void testReplyToAcrossConnections() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -2194,7 +2102,7 @@ public class StompTest extends StompTestSupport {
assertEquals("Number of clients", expected, actual);
}
@Test
@Test(timeout = 60000)
public void testDisconnectDoesNotDeadlockBroker() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestConnectionLeak();
@ -2245,14 +2153,9 @@ public class StompTest extends StompTestSupport {
fail("Received a frame that we were not expecting.");
}
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompConnection.close();
}
@Test
@Test(timeout = 60000)
public void testHeaderValuesAreTrimmed1_0() throws Exception {
String connectFrame = "CONNECT\n" +
@ -2290,12 +2193,9 @@ public class StompTest extends StompTestSupport {
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test
@Test(timeout = 60000)
public void testSendReceiveBigMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;

View File

@ -23,13 +23,13 @@ import java.net.Socket;
import java.security.ProtectionDomain;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import javax.jms.JMSException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@ -56,15 +56,16 @@ public class StompTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(StompTestSupport.class);
protected final AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
protected BrokerService brokerService;
protected int openwirePort;
protected int port;
protected int sslPort;
protected int nioPort;
protected int nioSslPort;
protected String jmsUri = "vm://localhost";
protected StompConnection stompConnection = new StompConnection();
protected StompConnection stompConnection;
protected ActiveMQConnectionFactory cf;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
@Rule public TestName name = new TestName();
@ -94,15 +95,11 @@ public class StompTestSupport {
@Before
public void setUp() throws Exception {
LOG.info("========== start " + getName() + " ==========");
autoFailTestSupport.startAutoFailThread();
startBroker();
stompConnect();
}
@After
public void tearDown() throws Exception {
LOG.info("========== finished " + getName() + " ==========");
autoFailTestSupport.stopAutoFailThread();
try {
stompDisconnect();
} catch (Exception ex) {
@ -110,11 +107,12 @@ public class StompTestSupport {
} finally {
stopBroker();
}
LOG.info("========== finished " + getName() + " ==========");
}
public void startBroker() throws Exception {
createBroker();
createBroker(true);
XStreamBrokerContext context = new XStreamBrokerContext();
brokerService.setBrokerContext(context);
@ -134,13 +132,18 @@ public class StompTestSupport {
sslContext.afterPropertiesSet();
brokerService.setSslContext(sslContext);
System.setProperty("javax.net.ssl.trustStore", keyStore.getCanonicalPath());
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", trustStore.getCanonicalPath());
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
addStompConnector();
addTranportConnectors();
addOpenWireConnector();
cf = new ActiveMQConnectionFactory(jmsUri);
BrokerPlugin authenticationPlugin = configureAuthentication();
if (authenticationPlugin != null) {
plugins.add(configureAuthorization());
@ -162,16 +165,34 @@ public class StompTestSupport {
brokerService.waitUntilStarted();
}
protected void applyMemoryLimitPolicy() throws Exception {
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
protected void createBroker() throws Exception {
public void restartBroker() throws Exception {
restartBroker(false);
}
public void restartBroker(boolean deleteAllOnStartup) throws Exception {
stopBroker();
createBroker(deleteAllOnStartup);
brokerService.start();
brokerService.waitUntilStarted();
}
protected void createBroker(boolean deleteAllOnStartup) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setPersistent(isPersistent());
brokerService.setDeleteAllMessagesOnStartup(deleteAllOnStartup);
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(true);
brokerService.setPopulateJMSXUserID(true);
brokerService.setSchedulerSupport(true);
brokerService.setUseJmx(isUseJmx());
brokerService.getManagementContext().setCreateConnector(false);
brokerService.getManagementContext().setCreateMBeanServer(false);
}
@ -179,6 +200,9 @@ public class StompTestSupport {
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
}
protected void applyMemoryLimitPolicy() throws Exception {
}
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("system", "manager", "users,admins"));
@ -253,29 +277,65 @@ public class StompTestSupport {
// NOOP here
}
protected void addOpenWireConnector() throws Exception {
public void addOpenWireConnector() throws Exception {
cf = new ActiveMQConnectionFactory(jmsUri);
}
protected void addStompConnector() throws Exception {
protected void addTranportConnectors() throws Exception {
TransportConnector connector = null;
// Subclasses can tailor this list to speed up the test startup / shutdown
connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
if (isUseTcpConnector()) {
connector = brokerService.addConnector(
"stomp://0.0.0.0:" + port + getAdditionalConfig());
port = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
LOG.debug("Using amqp port " + port);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:" + sslPort + getAdditionalConfig());
sslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+ssl port " + sslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"stomp+nio://0.0.0.0:" + nioPort + getAdditionalConfig());
nioPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
LOG.debug("Using amqp+nio port " + nioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"stomp+nio+ssl://0.0.0.0:" + nioSslPort + getAdditionalConfig());
nioSslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio+ssl port " + nioSslPort);
}
}
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
protected boolean isPersistent() {
return false;
}
protected boolean isUseJmx() {
return true;
}
protected boolean isUseTcpConnector() {
return true;
}
protected boolean isUseSslConnector() {
return false;
}
protected boolean isUseNioConnector() {
return false;
}
protected boolean isUseNioPlusSslConnector() {
return false;
}
protected String getAdditionalConfig() {
return "";
}
protected StompConnection stompConnect() throws Exception {
@ -303,8 +363,9 @@ public class StompTestSupport {
return getClass().getName() + "." + name.getMethodName();
}
protected void stompDisconnect() throws IOException {
protected void stompDisconnect() throws Exception {
if (stompConnection != null) {
stompConnection.disconnect();
stompConnection.close();
stompConnection = null;
}

View File

@ -20,9 +20,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@ -31,49 +29,30 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompTimeStampingBrokerPluginTest {
public class StompTimeStampingBrokerPluginTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTimeStampingBrokerPluginTest.class);
private BrokerService broker;
private String connectionUri;
private int port;
private StompConnection stompConnection = new StompConnection();
private Connection connection;
private Session session;
@Rule public TestName name = new TestName();
@Before
public void setUp() throws Exception {
TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setPlugins(new BrokerPlugin[] {tsbp});
connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
String stompConnectionUri = broker.addConnector("stomp://0.0.0.0:0").getPublishableConnectString();
URI uri = new URI(stompConnectionUri);
this.port = uri.getPort();
@Override
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
plugins.add(new TimeStampingBrokerPlugin());
}
@Override
protected void applyBrokerPolicies() throws Exception {
// Add policy and individual DLQ strategy
PolicyEntry policy = new PolicyEntry();
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
@ -86,39 +65,21 @@ public class StompTimeStampingBrokerPluginTest {
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.start();
brokerService.setDestinationPolicy(pMap);
}
@Override
public void setUp() throws Exception {
super.setUp();
stompConnect();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
connection = cf.createConnection();
connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
}
@After
public void tearDown() throws Exception {
broker.stop();
}
protected StompConnection stompConnect() throws Exception {
if (stompConnection == null) {
stompConnection = new StompConnection();
}
stompConnection.open(createSocket());
return stompConnection;
}
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.port);
}
protected String getQueueName() {
return getClass().getName() + "." + name.getMethodName();
}
@Test
@Test(timeout = 60000)
public void testSendMessage() throws Exception {
Destination destination = session.createQueue(getQueueName());

View File

@ -20,8 +20,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -31,12 +29,10 @@ import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.junit.Test;
import org.slf4j.Logger;
@ -49,21 +45,6 @@ public class StompVirtualTopicTest extends StompTestSupport {
private String failMsg = null;
@Override
protected void createBroker() throws Exception {
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost"));
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.getManagementContext().setCreateMBeanServer(false);
brokerService.setDeleteAllMessagesOnStartup(true);
File testDataDir = new File("target/activemq-data/StompVirtualTopicTest");
brokerService.setDataDirectoryFile(testDataDir);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
brokerService.setPersistenceAdapter(persistenceAdapter);
}
@Override
protected void applyMemoryLimitPolicy() throws Exception {
final SystemUsage memoryManager = new SystemUsage();
@ -88,6 +69,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
@Test(timeout = 90000)
public void testStompOnVirtualTopics() throws Exception {
LOG.info("Running Stomp Producer");
stompConnect();
StompConsumer consumerWorker = new StompConsumer(this);
Thread consumer = new Thread(consumerWorker);
@ -244,7 +226,6 @@ public class StompVirtualTopicTest extends StompTestSupport {
}
private long reportQueueStatistics() throws Exception {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:destinationType=Queue" + ",destinationName=Consumer.A.VirtualTopic.FOO"
+ ",type=Broker,brokerName=localhost");
QueueViewMBean queue = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);