Shorten the test a bit with tigther memory limits which reduces the

amount of messages we need to send to fill and trigger plist store use.
Also add some time to the timeout to account for very slow disk I/O.
This commit is contained in:
Timothy Bish 2015-04-28 12:19:37 -04:00
parent f5283a9045
commit d2248e92d7
1 changed files with 30 additions and 31 deletions

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class StompVirtualTopicTest extends StompTestSupport { public class StompVirtualTopicTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompVirtualTopicTest.class); private static final Logger LOG = LoggerFactory.getLogger(StompVirtualTopicTest.class);
private static final int NUM_MSGS = 100000; private static final int NUM_MSGS = 30000;
private String failMsg = null; private String failMsg = null;
@ -75,7 +76,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
final PolicyEntry entry = new PolicyEntry(); final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">"); entry.setQueue(">");
entry.setProducerFlowControl(false); entry.setProducerFlowControl(false);
entry.setMemoryLimit(10485760); entry.setMemoryLimit(262144);
entry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); entry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
policyEntries.add(entry); policyEntries.add(entry);
@ -84,13 +85,18 @@ public class StompVirtualTopicTest extends StompTestSupport {
brokerService.setDestinationPolicy(policyMap); brokerService.setDestinationPolicy(policyMap);
} }
@Test(timeout = 60000) @Test(timeout = 90000)
public void testStompOnVirtualTopics() throws Exception { public void testStompOnVirtualTopics() throws Exception {
LOG.info("Running Stomp Producer"); LOG.info("Running Stomp Producer");
StompConsumer consumerWorker = new StompConsumer(this); StompConsumer consumerWorker = new StompConsumer(this);
Thread consumer = new Thread(consumerWorker); Thread consumer = new Thread(consumerWorker);
StringBuilder payload = new StringBuilder();
for (int i = 0; i < 128; ++i) {
payload.append('*');
}
consumer.start(); consumer.start();
consumerWorker.awaitStartCompleted(); consumerWorker.awaitStartCompleted();
Thread.sleep(500); Thread.sleep(500);
@ -100,22 +106,21 @@ public class StompVirtualTopicTest extends StompTestSupport {
assertTrue(frame.toString().startsWith("CONNECTED")); assertTrue(frame.toString().startsWith("CONNECTED"));
for (int i = 0; i < NUM_MSGS - 1; i++) { for (int i = 0; i < NUM_MSGS - 1; i++) {
stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "}"); stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "} " + payload.toString());
} }
LOG.info("Sending last packet with receipt header"); LOG.info("Sending last packet with receipt header");
HashMap<String, Object> headers = new HashMap<String, Object>(); HashMap<String, Object> headers = new HashMap<String, Object>();
headers.put("receipt", "1234"); headers.put("receipt", "1234");
stompConnection.appendHeaders(headers); stompConnection.appendHeaders(headers);
String msg = "SEND\n" + "destination:/topic/VirtualTopic.FOO\n" + String msg = "SEND\n" + "destination:/topic/VirtualTopic.FOO\n" + "receipt: msg-1\n" + "\n\n" + "Hello World {" + (NUM_MSGS - 1) + "}" + Stomp.NULL;
"receipt: msg-1\n" + "\n\n" + "Hello World {" + (NUM_MSGS-1) + "}" + Stomp.NULL;
stompConnection.sendFrame(msg); stompConnection.sendFrame(msg);
msg = stompConnection.receiveFrame(); msg = stompConnection.receiveFrame();
assertTrue(msg.contains("RECEIPT")); assertTrue(msg.contains("RECEIPT"));
stompConnection.disconnect(); stompConnection.disconnect();
Thread.sleep(1000); TimeUnit.MILLISECONDS.sleep(100);
stompConnection.close(); stompConnection.close();
LOG.info("Stomp Producer finished. Waiting for consumer to join."); LOG.info("Stomp Producer finished. Waiting for consumer to join.");
@ -131,8 +136,8 @@ public class StompVirtualTopicTest extends StompTestSupport {
} }
/* /*
* Allow Consumer thread to indicate the test has failed. * Allow Consumer thread to indicate the test has failed. JUnits
* JUnits Assert.fail() does not work in threads spawned. * Assert.fail() does not work in threads spawned.
*/ */
protected void setFail(String msg) { protected void setFail(String msg) {
this.failMsg = msg; this.failMsg = msg;
@ -180,10 +185,10 @@ public class StompVirtualTopicTest extends StompTestSupport {
latch.countDown(); latch.countDown();
for (counter=0; counter<StompVirtualTopicTest.NUM_MSGS; counter++) { for (counter = 0; counter < StompVirtualTopicTest.NUM_MSGS; counter++) {
frame = stompConnection.receive(15000); frame = stompConnection.receive(15000);
log.trace("Received msg with content: " + frame.getBody()); log.trace("Received msg with content: " + frame.getBody());
if(!received.add(frame.getBody())) { if (!received.add(frame.getBody())) {
dups.add(frame.getBody()); dups.add(frame.getBody());
} }
} }
@ -193,8 +198,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
frame = stompConnection.receive(3000); frame = stompConnection.receive(3000);
assertNull(frame); assertNull(frame);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Correctly received " + e + " while trying to consume an additional msg." + LOG.info("Correctly received " + e + " while trying to consume an additional msg." + " This is expected as the queue should be empty now.");
" This is expected as the queue should be empty now.");
} }
// in addition check QueueSize using JMX // in addition check QueueSize using JMX
@ -204,16 +208,15 @@ public class StompVirtualTopicTest extends StompTestSupport {
parent.setFail("QueueSize not 0 after test has finished."); parent.setFail("QueueSize not 0 after test has finished.");
} }
log.debug("Stomp Consumer Received " + counter + " of " + StompVirtualTopicTest.NUM_MSGS + log.debug("Stomp Consumer Received " + counter + " of " + StompVirtualTopicTest.NUM_MSGS
" messages. Check QueueSize in JMX and try to browse the queue."); + " messages. Check QueueSize in JMX and try to browse the queue.");
if(!dups.isEmpty()) { if (!dups.isEmpty()) {
for(String msg : dups) { for (String msg : dups) {
LOG.debug("Received duplicate message: " + msg); LOG.debug("Received duplicate message: " + msg);
} }
parent.setFail("Received " + StompVirtualTopicTest.NUM_MSGS + parent.setFail("Received " + StompVirtualTopicTest.NUM_MSGS + " messages but " + dups.size() + " were dups.");
" messages but " + dups.size() + " were dups.");
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -224,13 +227,13 @@ public class StompVirtualTopicTest extends StompTestSupport {
} catch (Exception e) { } catch (Exception e) {
} }
parent.setFail("Stomp Consumer received " + counter + " of " + StompVirtualTopicTest.NUM_MSGS + parent.setFail("Stomp Consumer received " + counter + " of " + StompVirtualTopicTest.NUM_MSGS
" messages. Check QueueSize in JMX and try to browse the queue."); + " messages. Check QueueSize in JMX and try to browse the queue.");
} finally { } finally {
try { try {
stompConnection.disconnect(); stompConnection.disconnect();
Thread.sleep(1000); TimeUnit.MILLISECONDS.sleep(100);
stompConnection.close(); stompConnection.close();
} catch (Exception e) { } catch (Exception e) {
log.error("unexpected exception on sleep", e); log.error("unexpected exception on sleep", e);
@ -242,18 +245,14 @@ public class StompVirtualTopicTest extends StompTestSupport {
private long reportQueueStatistics() throws Exception { private long reportQueueStatistics() throws Exception {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:destinationType=Queue" + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:destinationType=Queue" + ",destinationName=Consumer.A.VirtualTopic.FOO"
",destinationName=Consumer.A.VirtualTopic.FOO" + + ",type=Broker,brokerName=localhost");
",type=Broker,brokerName=localhost"); QueueViewMBean queue = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queue = (QueueViewMBean)
brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
LOG.info("Consumer.A.VirtualTopic.FOO Inflight: " + queue.getInFlightCount() + LOG.info("Consumer.A.VirtualTopic.FOO Inflight: " + queue.getInFlightCount() + ", enqueueCount: " + queue.getEnqueueCount() + ", dequeueCount: "
", enqueueCount: " + queue.getEnqueueCount() + ", dequeueCount: " + + queue.getDequeueCount() + ", dispatchCount: " + queue.getDispatchCount());
queue.getDequeueCount() + ", dispatchCount: " + queue.getDispatchCount());
return queue.getQueueSize(); return queue.getQueueSize();
} }
} }
} }