git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1434956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-17 23:21:54 +00:00
parent 5d4d7487e4
commit f5d8a05ed7
1 changed files with 40 additions and 24 deletions

View File

@ -16,18 +16,23 @@
*/ */
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport; import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -38,7 +43,6 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class MessageGroupLateArrivalsTest extends JmsTestSupport { public class MessageGroupLateArrivalsTest extends JmsTestSupport {
public static final Logger log = LoggerFactory.getLogger(MessageGroupLateArrivalsTest.class); public static final Logger log = LoggerFactory.getLogger(MessageGroupLateArrivalsTest.class);
protected Connection connection; protected Connection connection;
@ -60,6 +64,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
junit.textui.TestRunner.run(suite()); junit.textui.TestRunner.run(suite());
} }
@Override
public void setUp() throws Exception { public void setUp() throws Exception {
broker = createBroker(); broker = createBroker();
broker.start(); broker.start();
@ -71,6 +76,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
connection.start(); connection.start();
} }
@Override
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService(); BrokerService service = new BrokerService();
service.setPersistent(false); service.setPersistent(false);
@ -86,6 +92,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
return service; return service;
} }
@Override
public void tearDown() throws Exception { public void tearDown() throws Exception {
producer.close(); producer.close();
session.close(); session.close();
@ -99,20 +106,25 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
int[] counters = {perBatch, perBatch, perBatch}; int[] counters = {perBatch, perBatch, perBatch};
CountDownLatch startSignal = new CountDownLatch(0); CountDownLatch startSignal = new CountDownLatch(0);
CountDownLatch doneSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(3);
CountDownLatch worker1Started = new CountDownLatch(1);
CountDownLatch worker2Started = new CountDownLatch(1);
CountDownLatch worker3Started = new CountDownLatch(1);
messageCount.put("worker1", 0); messageCount.put("worker1", 0);
messageGroups.put("worker1", new HashSet<String>()); messageGroups.put("worker1", new HashSet<String>());
Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups, worker1Started);
messageCount.put("worker2", 0); messageCount.put("worker2", 0);
messageGroups.put("worker2", new HashSet<String>()); messageGroups.put("worker2", new HashSet<String>());
Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups, worker2Started);
messageCount.put("worker3", 0); messageCount.put("worker3", 0);
messageGroups.put("worker3", new HashSet<String>()); messageGroups.put("worker3", new HashSet<String>());
Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups); Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups, worker3Started);
new Thread(worker1).start(); new Thread(worker1).start();
new Thread(worker2).start(); new Thread(worker2).start();
worker1Started.await();
worker2Started.await();
for (int i = 0; i < perBatch; i++) { for (int i = 0; i < perBatch; i++) {
Message msga = session.createTextMessage("hello a"); Message msga = session.createTextMessage("hello a");
@ -128,11 +140,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
new Thread(worker3).start(); new Thread(worker3).start();
// wait for presence before new group // wait for presence before new group
TimeUnit.SECONDS.sleep(4); worker3Started.await();
// ensure worker 3 is not next in line with normal dispatch
//Message msga = session.createTextMessage("hello to who ever is next in line");
//producer.send(msga);
for (int i = 0; i < perBatch; i++) { for (int i = 0; i < perBatch; i++) {
Message msgc = session.createTextMessage("hello c"); Message msgc = session.createTextMessage("hello c");
@ -142,8 +150,13 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
doneSignal.await(); doneSignal.await();
for (String worker : messageCount.keySet()) { List<String> workers = new ArrayList<String>(messageCount.keySet());
Collections.sort(workers);
for (String worker : workers) {
log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)); log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker));
}
for (String worker : workers) {
assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
, perBatch, messageCount.get(worker).intValue()); , perBatch, messageCount.get(worker).intValue());
assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
@ -157,14 +170,16 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
int[] counters = {perBatch, perBatch, perBatch}; int[] counters = {perBatch, perBatch, perBatch};
CountDownLatch startSignal = new CountDownLatch(0); CountDownLatch startSignal = new CountDownLatch(0);
CountDownLatch doneSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(2);
CountDownLatch worker1Started = new CountDownLatch(1);
CountDownLatch worker2Started = new CountDownLatch(1);
messageCount.put("worker1", 0); messageCount.put("worker1", 0);
messageGroups.put("worker1", new HashSet<String>()); messageGroups.put("worker1", new HashSet<String>());
Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups, worker1Started);
messageCount.put("worker2", 0); messageCount.put("worker2", 0);
messageGroups.put("worker2", new HashSet<String>()); messageGroups.put("worker2", new HashSet<String>());
Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups, worker2Started);
new Thread(worker1).start(); new Thread(worker1).start();
@ -182,7 +197,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
new Thread(worker2).start(); new Thread(worker2).start();
// wait for presence before new group // wait for presence before new group
TimeUnit.SECONDS.sleep(4); worker2Started.await();
for (int i = 0; i < perBatch; i++) { for (int i = 0; i < perBatch; i++) {
Message msgc = session.createTextMessage("hello a"); Message msgc = session.createTextMessage("hello a");
@ -211,12 +226,13 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
private String workerName = null; private String workerName = null;
private CountDownLatch startSignal = null; private CountDownLatch startSignal = null;
private CountDownLatch doneSignal = null; private CountDownLatch doneSignal = null;
private CountDownLatch workerStarted = null;
private int[] counters = null; private int[] counters = null;
private HashMap<String, Integer> messageCount; private final HashMap<String, Integer> messageCount;
private HashMap<String, Set<String>> messageGroups; private final HashMap<String, Set<String>> messageGroups;
private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal,
private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>> messageGroups) { int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>> messageGroups, CountDownLatch workerStarted) {
this.connection = connection; this.connection = connection;
this.queueName = queueName; this.queueName = queueName;
this.workerName = workerName; this.workerName = workerName;
@ -225,6 +241,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
this.counters = counters; this.counters = counters;
this.messageCount = messageCount; this.messageCount = messageCount;
this.messageGroups = messageGroups; this.messageGroups = messageGroups;
this.workerStarted = workerStarted;
} }
private void update(String group) { private void update(String group) {
@ -235,6 +252,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
messageGroups.put(workerName, groups); messageGroups.put(workerName, groups);
} }
@Override
public void run() { public void run() {
try { try {
@ -242,6 +260,7 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
log.info(workerName); log.info(workerName);
Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = sess.createConsumer(queueName); MessageConsumer consumer = sess.createConsumer(queueName);
workerStarted.countDown();
while (true) { while (true) {
if (counters[0] == 0 && counters[1] == 0 && counters[2] == 0) { if (counters[0] == 0 && counters[1] == 0 && counters[2] == 0) {
@ -257,20 +276,17 @@ public class MessageGroupLateArrivalsTest extends JmsTestSupport {
msg.acknowledge(); msg.acknowledge();
String group = msg.getStringProperty("JMSXGroupID"); String group = msg.getStringProperty("JMSXGroupID");
boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer"); msg.getBooleanProperty("JMSXGroupFirstForConsumer");
if ("A".equals(group)) { if ("A".equals(group)) {
--counters[0]; --counters[0];
update(group); update(group);
//Thread.sleep(500);
} else if ("B".equals(group)) { } else if ("B".equals(group)) {
--counters[1]; --counters[1];
update(group); update(group);
//Thread.sleep(100);
} else if ("C".equals(group)) { } else if ("C".equals(group)) {
--counters[2]; --counters[2];
update(group); update(group);
//Thread.sleep(10);
} else { } else {
log.warn(workerName + ", unknown group"); log.warn(workerName + ", unknown group");
} }