From f5d8a05ed7957f42c3a734891bc7f36f60891889 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 17 Jan 2013 23:21:54 +0000 Subject: [PATCH] apply patch for: https://issues.apache.org/jira/browse/AMQ-4260 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1434956 13f79535-47bb-0310-9956-ffa450edef68 --- .../MessageGroupLateArrivalsTest.java | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java index 68f449ebf9..53655e4f39 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java @@ -16,18 +16,23 @@ */ package org.apache.activemq.usecases; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; + import junit.framework.Test; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTestSupport; import org.apache.activemq.broker.BrokerService; @@ -38,7 +43,6 @@ import org.apache.activemq.command.ActiveMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class MessageGroupLateArrivalsTest extends JmsTestSupport { public static final Logger log = LoggerFactory.getLogger(MessageGroupLateArrivalsTest.class); protected Connection connection; @@ -60,6 +64,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { junit.textui.TestRunner.run(suite()); } + @Override public void setUp() throws Exception { broker = createBroker(); broker.start(); @@ -71,6 +76,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { connection.start(); } + @Override protected BrokerService createBroker() throws Exception { BrokerService service = new BrokerService(); service.setPersistent(false); @@ -86,6 +92,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { return service; } + @Override public void tearDown() throws Exception { producer.close(); session.close(); @@ -99,20 +106,25 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { int[] counters = {perBatch, perBatch, perBatch}; CountDownLatch startSignal = new CountDownLatch(0); - CountDownLatch doneSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(3); + CountDownLatch worker1Started = new CountDownLatch(1); + CountDownLatch worker2Started = new CountDownLatch(1); + CountDownLatch worker3Started = new CountDownLatch(1); messageCount.put("worker1", 0); messageGroups.put("worker1", new HashSet()); - Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); + Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups, worker1Started); messageCount.put("worker2", 0); messageGroups.put("worker2", new HashSet()); - Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); + Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups, worker2Started); messageCount.put("worker3", 0); messageGroups.put("worker3", new HashSet()); - Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups); + Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups, worker3Started); new Thread(worker1).start(); new Thread(worker2).start(); + worker1Started.await(); + worker2Started.await(); for (int i = 0; i < perBatch; i++) { Message msga = session.createTextMessage("hello a"); @@ -128,11 +140,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { new Thread(worker3).start(); // wait for presence before new group - TimeUnit.SECONDS.sleep(4); - - // ensure worker 3 is not next in line with normal dispatch - //Message msga = session.createTextMessage("hello to who ever is next in line"); - //producer.send(msga); + worker3Started.await(); for (int i = 0; i < perBatch; i++) { Message msgc = session.createTextMessage("hello c"); @@ -142,8 +150,13 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { doneSignal.await(); - for (String worker : messageCount.keySet()) { + List workers = new ArrayList(messageCount.keySet()); + Collections.sort(workers); + for (String worker : workers) { log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)); + } + + for (String worker : workers) { assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) , perBatch, messageCount.get(worker).intValue()); assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) @@ -157,14 +170,16 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { int[] counters = {perBatch, perBatch, perBatch}; CountDownLatch startSignal = new CountDownLatch(0); - CountDownLatch doneSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(2); + CountDownLatch worker1Started = new CountDownLatch(1); + CountDownLatch worker2Started = new CountDownLatch(1); messageCount.put("worker1", 0); messageGroups.put("worker1", new HashSet()); - Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); + Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups, worker1Started); messageCount.put("worker2", 0); messageGroups.put("worker2", new HashSet()); - Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); + Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups, worker2Started); new Thread(worker1).start(); @@ -182,7 +197,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { new Thread(worker2).start(); // wait for presence before new group - TimeUnit.SECONDS.sleep(4); + worker2Started.await(); for (int i = 0; i < perBatch; i++) { Message msgc = session.createTextMessage("hello a"); @@ -211,12 +226,13 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { private String workerName = null; private CountDownLatch startSignal = null; private CountDownLatch doneSignal = null; + private CountDownLatch workerStarted = null; private int[] counters = null; - private HashMap messageCount; - private HashMap> messageGroups; + private final HashMap messageCount; + private final HashMap> messageGroups; - - private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap messageCount, HashMap> messageGroups) { + private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, + int[] counters, HashMap messageCount, HashMap> messageGroups, CountDownLatch workerStarted) { this.connection = connection; this.queueName = queueName; this.workerName = workerName; @@ -225,6 +241,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { this.counters = counters; this.messageCount = messageCount; this.messageGroups = messageGroups; + this.workerStarted = workerStarted; } private void update(String group) { @@ -235,6 +252,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { messageGroups.put(workerName, groups); } + @Override public void run() { try { @@ -242,6 +260,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { log.info(workerName); Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = sess.createConsumer(queueName); + workerStarted.countDown(); while (true) { if (counters[0] == 0 && counters[1] == 0 && counters[2] == 0) { @@ -257,20 +276,17 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport { msg.acknowledge(); String group = msg.getStringProperty("JMSXGroupID"); - boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer"); + msg.getBooleanProperty("JMSXGroupFirstForConsumer"); if ("A".equals(group)) { --counters[0]; update(group); - //Thread.sleep(500); } else if ("B".equals(group)) { --counters[1]; update(group); - //Thread.sleep(100); } else if ("C".equals(group)) { --counters[2]; update(group); - //Thread.sleep(10); } else { log.warn(workerName + ", unknown group"); }