diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java index ab5069a0c0..cfa28b551a 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.management.ObjectName; @@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory; public class StompVirtualTopicTest extends StompTestSupport { private static final Logger LOG = LoggerFactory.getLogger(StompVirtualTopicTest.class); - private static final int NUM_MSGS = 100000; + private static final int NUM_MSGS = 30000; private String failMsg = null; @@ -75,7 +76,7 @@ public class StompVirtualTopicTest extends StompTestSupport { final PolicyEntry entry = new PolicyEntry(); entry.setQueue(">"); entry.setProducerFlowControl(false); - entry.setMemoryLimit(10485760); + entry.setMemoryLimit(262144); entry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); policyEntries.add(entry); @@ -84,13 +85,18 @@ public class StompVirtualTopicTest extends StompTestSupport { brokerService.setDestinationPolicy(policyMap); } - @Test(timeout = 60000) + @Test(timeout = 90000) public void testStompOnVirtualTopics() throws Exception { LOG.info("Running Stomp Producer"); StompConsumer consumerWorker = new StompConsumer(this); Thread consumer = new Thread(consumerWorker); + StringBuilder payload = new StringBuilder(); + for (int i = 0; i < 128; ++i) { + payload.append('*'); + } + consumer.start(); consumerWorker.awaitStartCompleted(); Thread.sleep(500); @@ -100,22 +106,21 @@ public class StompVirtualTopicTest extends StompTestSupport { assertTrue(frame.toString().startsWith("CONNECTED")); for (int i = 0; i < NUM_MSGS - 1; i++) { - stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "}"); + stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "} " + payload.toString()); } LOG.info("Sending last packet with receipt header"); HashMap headers = new HashMap(); headers.put("receipt", "1234"); stompConnection.appendHeaders(headers); - String msg = "SEND\n" + "destination:/topic/VirtualTopic.FOO\n" + - "receipt: msg-1\n" + "\n\n" + "Hello World {" + (NUM_MSGS-1) + "}" + Stomp.NULL; + String msg = "SEND\n" + "destination:/topic/VirtualTopic.FOO\n" + "receipt: msg-1\n" + "\n\n" + "Hello World {" + (NUM_MSGS - 1) + "}" + Stomp.NULL; stompConnection.sendFrame(msg); msg = stompConnection.receiveFrame(); assertTrue(msg.contains("RECEIPT")); stompConnection.disconnect(); - Thread.sleep(1000); + TimeUnit.MILLISECONDS.sleep(100); stompConnection.close(); LOG.info("Stomp Producer finished. Waiting for consumer to join."); @@ -131,8 +136,8 @@ public class StompVirtualTopicTest extends StompTestSupport { } /* - * Allow Consumer thread to indicate the test has failed. - * JUnits Assert.fail() does not work in threads spawned. + * Allow Consumer thread to indicate the test has failed. JUnits + * Assert.fail() does not work in threads spawned. */ protected void setFail(String msg) { this.failMsg = msg; @@ -180,10 +185,10 @@ public class StompVirtualTopicTest extends StompTestSupport { latch.countDown(); - for (counter=0; counter