From 388c16d0844816ba672d501b29cab236dfe683ea Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 16 Feb 2015 18:50:24 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5590 Reduce test time, add timeouts etc. --- .../activemq/transport/stomp/ConnectTest.java | 26 ++--- .../activemq/transport/stomp/Stomp11Test.java | 107 ++++++++++-------- .../activemq/transport/stomp/Stomp12Test.java | 18 ++- .../transport/stomp/StompPrefetchTest.java | 14 ++- .../activemq/transport/stomp/StompTest.java | 18 +-- .../transport/stomp/StompTestSupport.java | 8 ++ .../StompTimeStampingBrokerPluginTest.java | 2 +- .../stomp/StompVirtualTopicTest.java | 30 ++--- 8 files changed, 131 insertions(+), 92 deletions(-) diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java index 7154996283..1147709654 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java @@ -31,11 +31,9 @@ import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.security.JaasDualAuthenticationPlugin; import org.apache.activemq.util.Wait; - import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +58,7 @@ public class ConnectTest { } } - @Test + @Test(timeout = 60000) public void testStompConnectLeak() throws Exception { brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0"); @@ -84,11 +82,12 @@ public class ConnectTest { public boolean isSatisified() throws Exception { return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); } - })); + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200))); + assertTrue("no exceptions", exceptions.isEmpty()); } - @Test + @Test(timeout = 60000) public void testJaasDualStopWithOpenConnection() throws Exception { brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()}); @@ -120,7 +119,7 @@ public class ConnectTest { public boolean isSatisified() throws Exception { return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); } - })); + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200))); assertTrue("connected on time", doneConnect.await(5, TimeUnit.SECONDS)); brokerService.stop(); @@ -135,7 +134,7 @@ public class ConnectTest { assertTrue("no exceptions", exceptions.isEmpty()); } - @Test + @Test(timeout = 60000) public void testInactivityMonitor() throws Exception { brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=1000,0&transport.useKeepAlive=false"); @@ -159,11 +158,11 @@ public class ConnectTest { t1.start(); assertTrue("one connection", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); - } - })); + @Override + public boolean isSatisified() throws Exception { + return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); + } + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200))); // and it should be closed due to inactivity assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() { @@ -171,7 +170,8 @@ public class ConnectTest { public boolean isSatisified() throws Exception { return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); } - })); + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200))); + assertTrue("no exceptions", exceptions.isEmpty()); } } \ No newline at end of file diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index 2e3a49e968..093116a031 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -38,6 +38,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +70,7 @@ public class Stomp11Test extends StompTestSupport { port = connector.getConnectUri().getPort(); } - @Test + @Test(timeout = 60000) public void testConnect() throws Exception { String connectFrame = "STOMP\n" + @@ -93,7 +94,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testConnectedNeverEncoded() throws Exception { String connectFrame = "STOMP\n" + @@ -123,7 +124,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testConnectWithVersionOptions() throws Exception { String connectFrame = "STOMP\n" + @@ -145,7 +146,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testConnectWithValidFallback() throws Exception { String connectFrame = "STOMP\n" + @@ -167,7 +168,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testConnectWithInvalidFallback() throws Exception { String connectFrame = "STOMP\n" + @@ -186,7 +187,7 @@ public class Stomp11Test extends StompTestSupport { assertTrue(f.indexOf("message:") >= 0); } - @Test + @Test(timeout = 60000) public void testHeartbeats() throws Exception { String connectFrame = "STOMP\n" + @@ -230,7 +231,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testHeartbeatsDropsIdleConnection() throws Exception { String connectFrame = "STOMP\n" + @@ -262,7 +263,7 @@ public class Stomp11Test extends StompTestSupport { assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000); } - @Test + @Test(timeout = 60000) public void testHeartbeatsKeepsConnectionOpen() throws Exception { String connectFrame = "STOMP\n" + @@ -312,7 +313,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testSendAfterMissingHeartbeat() throws Exception { String connectFrame = "STOMP\n" + "login:system\n" + @@ -330,7 +331,13 @@ public class Stomp11Test extends StompTestSupport { assertTrue(f.indexOf("session:") >= 0); LOG.debug("Broker sent: " + f); - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); try { String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + @@ -343,7 +350,7 @@ public class Stomp11Test extends StompTestSupport { } } - @Test + @Test(timeout = 60000) public void testRejectInvalidHeartbeats1() throws Exception { String connectFrame = "STOMP\n" + @@ -363,7 +370,7 @@ public class Stomp11Test extends StompTestSupport { assertTrue(f.indexOf("message:") >= 0); } - @Test + @Test(timeout = 60000) public void testRejectInvalidHeartbeats2() throws Exception { String connectFrame = "STOMP\n" + @@ -383,7 +390,7 @@ public class Stomp11Test extends StompTestSupport { assertTrue(f.indexOf("message:") >= 0); } - @Test + @Test(timeout = 60000) public void testRejectInvalidHeartbeats3() throws Exception { String connectFrame = "STOMP\n" + @@ -403,7 +410,7 @@ public class Stomp11Test extends StompTestSupport { assertTrue(f.indexOf("message:") >= 0); } - @Test + @Test(timeout = 60000) public void testSubscribeAndUnsubscribe() throws Exception { String connectFrame = "STOMP\n" + @@ -431,15 +438,16 @@ public class Stomp11Test extends StompTestSupport { assertTrue(stompFrame.getAction().equals("MESSAGE")); frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + - "id:12345\n\n" + Stomp.NULL; + "receipt:1\n" + "id:12345\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - Thread.sleep(4000); + stompFrame = stompConnection.receive(); + assertTrue(stompFrame.getAction().equals("RECEIPT")); stompConnection.sendFrame(message); try { - frame = stompConnection.receiveFrame(); + frame = stompConnection.receiveFrame(2000); LOG.info("Received frame: " + frame); fail("No message should have been received since subscription was removed"); } catch (SocketTimeoutException e) { @@ -449,7 +457,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testSubscribeWithNoId() throws Exception { String connectFrame = "STOMP\n" + @@ -476,7 +484,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testUnsubscribeWithNoId() throws Exception { String connectFrame = "STOMP\n" + @@ -493,10 +501,11 @@ public class Stomp11Test extends StompTestSupport { assertTrue(f.startsWith("CONNECTED")); String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + - "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + "receipt:1\n" + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - Thread.sleep(2000); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("RECEIPT")); frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -508,7 +517,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testAckMessageWithId() throws Exception { String connectFrame = "STOMP\n" + @@ -547,7 +556,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testAckMessageWithNoId() throws Exception { String connectFrame = "STOMP\n" + @@ -592,7 +601,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testSubscribeWithWildcardSubscription() throws Exception { String connectFrame = "STOMP\n" + "login:system\n" + @@ -630,7 +639,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testQueueBrowerSubscription() throws Exception { final int MSG_COUNT = 10; @@ -677,10 +686,11 @@ public class Stomp11Test extends StompTestSupport { assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + - "id:12345\n\n" + Stomp.NULL; + "receipt:1\n" + "id:12345\n\n" + Stomp.NULL; stompConnection.sendFrame(unsub); - Thread.sleep(2000); + String receipt = stompConnection.receiveFrame(); + assertTrue(receipt.contains("RECEIPT")); subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL; stompConnection.sendFrame(subscribe); @@ -697,7 +707,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testSendMessageWithStandardHeadersEncoded() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -731,7 +741,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testSendMessageWithRepeatedEntries() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -761,7 +771,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception { String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "accept-version:1.1" + "\n\n" + Stomp.NULL; @@ -790,7 +800,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testNackMessage() throws Exception { String connectFrame = "STOMP\n" + @@ -846,7 +856,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testHeaderValuesAreNotWSTrimmed() throws Exception { stompConnection.setVersion(Stomp.V1_1); String connectFrame = "STOMP\n" + @@ -889,7 +899,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testDurableSubAndUnSubOnTwoTopics() throws Exception { stompConnection.setVersion(Stomp.V1_1); @@ -937,9 +947,13 @@ public class Stomp11Test extends StompTestSupport { frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - try { - Thread.sleep(400); - } catch (InterruptedException e){} + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); // reconnect and send some messages to the offline subscribers and then try to get // them after subscribing again. @@ -979,7 +993,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testDurableSubAndUnSubFlow() throws Exception { stompConnection.setVersion(Stomp.V1_1); @@ -1055,14 +1069,11 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); } - @Test + @Test(timeout = 60000) public void testMultipleDurableSubsWithOfflineMessages() throws Exception { stompConnection.setVersion(Stomp.V1_1); - String domain = "org.apache.activemq"; - ObjectName brokerName = new ObjectName(domain + ":type=Broker,brokerName=localhost"); - - BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); + final BrokerViewMBean view = getProxyToBroker(); String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" + "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL; @@ -1102,9 +1113,13 @@ public class Stomp11Test extends StompTestSupport { frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - try { - Thread.sleep(400); - } catch (InterruptedException e){} + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return view.getCurrentConnectionsCount() == 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); // reconnect and send some messages to the offline subscribers and then try to get // them after subscribing again. @@ -1112,7 +1127,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(connectFrame); frame = stompConnection.receiveFrame(); LOG.debug("Broker sent: " + frame); - assertTrue(frame.startsWith("CONNECTED")); + assertTrue(frame.contains("CONNECTED")); assertEquals(view.getDurableTopicSubscribers().length, 0); assertEquals(view.getInactiveDurableTopicSubscribers().length, 2); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java index 23e0754314..1ccdbd80f1 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java @@ -21,10 +21,12 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.Socket; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,9 +220,14 @@ public class Stomp12Test extends StompTestSupport { frame = "DISCONNECT\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - try { - Thread.sleep(400); - } catch (InterruptedException e){} + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); // reconnect and send some messages to the offline subscribers and then try to get // them after subscribing again. @@ -404,10 +411,11 @@ public class Stomp12Test extends StompTestSupport { assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + - "id:12345\n\n" + Stomp.NULL; + "receipt:1" + "id:12345\n\n" + Stomp.NULL; stompConnection.sendFrame(unsub); - Thread.sleep(2000); + StompFrame stompFrame = stompConnection.receive(); + assertTrue(stompFrame.getAction().equals("RECEIPT")); subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL; stompConnection.sendFrame(subscribe); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompPrefetchTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompPrefetchTest.java index b5e1c60853..e143a83319 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompPrefetchTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompPrefetchTest.java @@ -20,6 +20,7 @@ package org.apache.activemq.transport.stomp; import static org.junit.Assert.assertTrue; import java.util.HashMap; +import java.util.concurrent.TimeUnit; import javax.jms.Destination; @@ -50,9 +51,11 @@ public class StompPrefetchTest extends StompTestSupport { brokerService.setDestinationPolicy(pMap); brokerService.setAdvisorySupport(true); + brokerService.setUseJmx(false); + brokerService.setPersistent(false); } - @Test + @Test(timeout = 60000) public void testTopicSubPrefetch() throws Exception { stompConnection.connect("system", "manager"); @@ -61,7 +64,7 @@ public class StompPrefetchTest extends StompTestSupport { verifyPrefetch(10, new ActiveMQTopic("T")); } - @Test + @Test(timeout = 60000) public void testDurableSubPrefetch() throws Exception { stompConnection.connect("system", "manager"); HashMap headers = new HashMap(); @@ -71,7 +74,7 @@ public class StompPrefetchTest extends StompTestSupport { verifyPrefetch(10, new ActiveMQTopic("T")); } - @Test + @Test(timeout = 60000) public void testQBrowserSubPrefetch() throws Exception { HashMap headers = new HashMap(); headers.put("login","system"); @@ -86,7 +89,7 @@ public class StompPrefetchTest extends StompTestSupport { verifyPrefetch(10, new ActiveMQQueue("Q")); } - @Test + @Test(timeout = 60000) public void testQueueSubPrefetch() throws Exception { stompConnection.connect("system", "manager"); stompConnection.subscribe("/queue/Q", Stomp.Headers.Subscribe.AckModeValues.AUTO); @@ -107,7 +110,6 @@ public class StompPrefetchTest extends StompTestSupport { } return false; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); } - } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 39adc6ba64..a3419d719c 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -602,7 +602,7 @@ public class StompTest extends StompTestSupport { } // sleep a while before publishing another set of messages - TimeUnit.SECONDS.sleep(1); + TimeUnit.MILLISECONDS.sleep(500); for (int i = 0; i < ctr; ++i) { data[i] = getName() + ":second:" + i; @@ -781,7 +781,7 @@ public class StompTest extends StompTestSupport { LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount()); return queueView.getDequeueCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -1533,9 +1533,13 @@ public class StompTest extends StompTestSupport { // disconnect frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - try { - Thread.sleep(1000); - } catch (InterruptedException e){} + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); //reconnect stompConnect(); @@ -1556,7 +1560,7 @@ public class StompTest extends StompTestSupport { public boolean isSatisified() throws Exception { return view.getDurableTopicSubscribers().length == 0 && view.getInactiveDurableTopicSubscribers().length == 0; } - }); + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); assertEquals(view.getDurableTopicSubscribers().length, 0); assertEquals(view.getInactiveDurableTopicSubscribers().length, 0); @@ -2183,7 +2187,7 @@ public class StompTest extends StompTestSupport { public boolean isSatisified() throws Exception { return brokerService.getBroker().getClients().length == expected; } - }); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)); org.apache.activemq.broker.Connection[] clients = brokerService.getBroker().getClients(); int actual = clients.length; diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java index e763552028..aa8b136b5e 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java @@ -50,9 +50,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StompTestSupport { + protected static final Logger LOG = LoggerFactory.getLogger(StompTestSupport.class); + protected final AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {}; protected BrokerService brokerService; protected int port; @@ -90,6 +94,7 @@ public class StompTestSupport { @Before public void setUp() throws Exception { + LOG.info("========== start " + getName() + " =========="); autoFailTestSupport.startAutoFailThread(); startBroker(); stompConnect(); @@ -97,6 +102,7 @@ public class StompTestSupport { @After public void tearDown() throws Exception { + LOG.info("========== finished " + getName() + " =========="); autoFailTestSupport.stopAutoFailThread(); try { stompDisconnect(); @@ -167,6 +173,8 @@ public class StompTestSupport { brokerService.setSchedulerSupport(true); brokerService.setPopulateJMSXUserID(true); brokerService.setSchedulerSupport(true); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.getManagementContext().setCreateMBeanServer(false); JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl(); jobStore.setDirectory(new File("activemq-data")); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTimeStampingBrokerPluginTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTimeStampingBrokerPluginTest.java index 633056e678..23f66556ee 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTimeStampingBrokerPluginTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTimeStampingBrokerPluginTest.java @@ -66,7 +66,7 @@ public class StompTimeStampingBrokerPluginTest { broker = new BrokerService(); broker.setPersistent(false); - broker.setUseJmx(true); + broker.setUseJmx(false); broker.setPlugins(new BrokerPlugin[] {tsbp}); connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); String stompConnectionUri = broker.addConnector("stomp://0.0.0.0:0").getPublishableConnectString(); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java index f2ca955e82..ab5069a0c0 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java @@ -52,6 +52,8 @@ public class StompVirtualTopicTest extends StompTestSupport { protected void createBroker() throws Exception { brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost")); brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.getManagementContext().setCreateMBeanServer(false); brokerService.setDeleteAllMessagesOnStartup(true); File testDataDir = new File("target/activemq-data/StompVirtualTopicTest"); @@ -82,7 +84,7 @@ public class StompVirtualTopicTest extends StompTestSupport { brokerService.setDestinationPolicy(policyMap); } - @Test + @Test(timeout = 60000) public void testStompOnVirtualTopics() throws Exception { LOG.info("Running Stomp Producer"); @@ -97,7 +99,7 @@ public class StompVirtualTopicTest extends StompTestSupport { StompFrame frame = stompConnection.receive(); assertTrue(frame.toString().startsWith("CONNECTED")); - for (int i=0; i headers = new HashMap(); + headers.put("receipt", "sub-1"); + stompConnection.subscribe("/queue/Consumer.A.VirtualTopic.FOO", "auto", headers); + + String receipt = stompConnection.receiveFrame(); + assertTrue("Should have read a receipt for subscribe", receipt.contains("RECEIPT")); + assertTrue("Receipt contains receipt-id", receipt.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); + latch.countDown(); for (counter=0; counter