From a321204079f435deaf8d082fb3de95c5c792c7f2 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 23 Jan 2009 12:11:00 +0000 Subject: [PATCH] fix for http://issues.apache.org/activemq/browse/AMQ-2016 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@737017 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 67 ++++- .../broker/region/policy/PolicyEntry.java | 21 +- .../usecases/MessageGroupDelayedTest.java | 229 ++++++++++++++++++ 3 files changed, 315 insertions(+), 2 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index c96cc5865a..c94465d73f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -27,12 +27,16 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; + import javax.jms.InvalidSelectorException; import javax.jms.JMSException; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -96,6 +100,10 @@ public class Queue extends BaseDestination implements Task { private boolean strictOrderDispatch=false; private QueueDispatchSelector dispatchSelector; private boolean optimizedDispatch=false; + private boolean firstConsumer = false; + private int timeBeforeDispatchStarts = 0; + private int consumersBeforeDispatchStarts = 0; + private CountDownLatch consumersBeforeStartsLatch; private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { wakeup(); @@ -134,7 +142,7 @@ public class Queue extends BaseDestination implements Task { } // If a VMPendingMessageCursor don't use the default Producer System Usage // since it turns into a shared blocking queue which can lead to a network deadlock. - // If we are ccursoring to disk..it's not and issue because it does not block due + // If we are cursoring to disk..it's not and issue because it does not block due // to large disk sizes. if( messages instanceof VMPendingMessageCursor ) { this.systemUsage = brokerService.getSystemUsage(); @@ -221,6 +229,18 @@ public class Queue extends BaseDestination implements Task { // needs to be synchronized - so no contention with dispatching synchronized (consumers) { + + // set a flag if this is a first consumer + if (consumers.size() == 0) { + firstConsumer = true; + } else { + firstConsumer = false; + } + + if (consumersBeforeStartsLatch != null) { + consumersBeforeStartsLatch.countDown(); + } + addToConsumerList(sub); if (sub.getConsumerInfo().isExclusive()) { Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); @@ -610,6 +630,22 @@ public class Queue extends BaseDestination implements Task { public void setOptimizedDispatch(boolean optimizedDispatch) { this.optimizedDispatch = optimizedDispatch; } + public int getTimeBeforeDispatchStarts() { + return timeBeforeDispatchStarts; + } + + public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { + this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; + } + + public int getConsumersBeforeDispatchStarts() { + return consumersBeforeDispatchStarts; + } + + public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { + this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; + consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts); + } // Implementation methods // ------------------------------------------------------------------------- @@ -990,6 +1026,35 @@ public class Queue extends BaseDestination implements Task { } } + if (firstConsumer) { + firstConsumer = false; + try { + if (consumersBeforeDispatchStarts > 0) { + int timeout = 1000; // wait one second by default if consumer count isn't reached + if (timeBeforeDispatchStarts > 0) { + timeout = timeBeforeDispatchStarts; + } + if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { + if (LOG.isDebugEnabled()) { + LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch."); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch."); + } + } + } + if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { + iteratingMutex.wait(timeBeforeDispatchStarts); + if (LOG.isDebugEnabled()) { + LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch."); + } + } + } catch (Exception e) { + LOG.error(e); + } + } + boolean pageInMoreMessages = false; synchronized (messages) { pageInMoreMessages = !messages.isEmpty(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 10dd3be471..7d99759db7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -65,6 +65,8 @@ public class PolicyEntry extends DestinationMapEntry { private boolean useConsumerPriority=true; private boolean strictOrderDispatch=false; private boolean lazyDispatch=false; + private int timeBeforeDispatchStarts = 0; + private int consumersBeforeDispatchStarts = 0; private boolean advisoryForSlowConsumers; private boolean advisdoryForFastProducers; private boolean advisoryForDiscardingMessages; @@ -93,7 +95,8 @@ public class PolicyEntry extends DestinationMapEntry { queue.setStrictOrderDispatch(isStrictOrderDispatch()); queue.setOptimizedDispatch(isOptimizedDispatch()); queue.setLazyDispatch(isLazyDispatch()); - + queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); + queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); } public void configure(Topic topic) { @@ -439,6 +442,22 @@ public class PolicyEntry extends DestinationMapEntry { this.lazyDispatch = lazyDispatch; } + public int getTimeBeforeDispatchStarts() { + return timeBeforeDispatchStarts; + } + + public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { + this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; + } + + public int getConsumersBeforeDispatchStarts() { + return consumersBeforeDispatchStarts; + } + + public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { + this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; + } + /** * @return the advisoryForSlowConsumers */ diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java new file mode 100644 index 0000000000..574152f56f --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java @@ -0,0 +1,229 @@ +package org.apache.activemq.usecases; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JMSConsumerTest; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + + +public class MessageGroupDelayedTest extends JmsTestSupport { + public static final Log log = LogFactory.getLog(MessageGroupDelayedTest.class); + protected Connection connection; + protected Session session; + protected MessageProducer producer; + protected Destination destination; + + public int consumersBeforeDispatchStarts; + public int timeBeforeDispatchStarts; + + BrokerService broker; + protected TransportConnector connector; + + protected HashMap messageCount = new HashMap(); + protected HashMap> messageGroups = new HashMap>(); + + public static Test suite() { + return suite(MessageGroupDelayedTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri()); + //ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + connection = connFactory.createConnection(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = new ActiveMQQueue("test-queue2"); + producer = session.createProducer(destination); + connection.start(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + + // Setup a destination policy where it takes only 1 message at a time. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts); + policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts); + policyMap.setDefaultEntry(policy); + service.setDestinationPolicy(policyMap); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + public void tearDown() throws Exception { + producer.close(); + session.close(); + connection.close(); + } + + + + public void initCombosForTestDelayedDirectConnectionListener() { + addCombinationValues("consumersBeforeDispatchStarts", new Object[] {0, 3, 5}); + addCombinationValues("timeBeforeDispatchStarts", new Object[] {0, 100}); + } + + public void testDelayedDirectConnectionListener() throws Exception { + + for(int i = 0; i < 10; i++) { + Message msga = session.createTextMessage("hello a"); + msga.setStringProperty("JMSXGroupID", "A"); + producer.send(msga); + Message msgb = session.createTextMessage("hello b"); + msgb.setStringProperty("JMSXGroupID", "B"); + producer.send(msgb); + Message msgc = session.createTextMessage("hello c"); + msgc.setStringProperty("JMSXGroupID", "C"); + producer.send(msgc); + } + log.info("30 messages sent to group A/B/C"); + + int[] counters = {10, 10, 10}; + + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(1); + + messageCount.put("worker1", 0); + messageGroups.put("worker1", new HashSet()); + Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); + messageCount.put("worker2", 0); + messageGroups.put("worker2", new HashSet()); + Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); + messageCount.put("worker3", 0); + messageGroups.put("worker3", new HashSet()); + Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups); + + + new Thread(worker1).start(); + new Thread(worker2).start(); + new Thread(worker3).start(); + + startSignal.countDown(); + doneSignal.await(); + + // check results + if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) { + log.info("Ignoring results because both parameters are 0"); + return; + } + + for (String worker: messageCount.keySet()) { + log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)); + assertEquals(10, messageCount.get(worker).intValue()); + assertEquals(1, messageGroups.get(worker).size()); + } + + } + + private static final class Worker implements Runnable { + private Connection connection = null; + private Destination queueName = null; + private String workerName = null; + private CountDownLatch startSignal = null; + private CountDownLatch doneSignal = null; + private int[] counters = null; + private HashMap messageCount; + private HashMap>messageGroups; + + + private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap messageCount, HashMap>messageGroups) { + this.connection = connection; + this.queueName = queueName; + this.workerName = workerName; + this.startSignal = startSignal; + this.doneSignal = doneSignal; + this.counters = counters; + this.messageCount = messageCount; + this.messageGroups = messageGroups; + } + + private void update(String group) { + int msgCount = messageCount.get(workerName); + messageCount.put(workerName, msgCount + 1); + Set groups = messageGroups.get(workerName); + groups.add(group); + messageGroups.put(workerName, groups); + } + + public void run() { + + try { + log.info(workerName); + startSignal.await(); + Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = sess.createConsumer(queueName); + + while(true) { + if(counters[0] == 0 && counters[1] == 0 && counters[2] == 0 ) { + doneSignal.countDown(); + log.info(workerName + " done..."); + break; + } + + Message msg = consumer.receive(500); + if(msg == null) + continue; + + String group = msg.getStringProperty("JMSXGroupID"); + boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer"); + + if("A".equals(group)){ + --counters[0]; + update(group); + Thread.sleep(500); + } + else if("B".equals(group)) { + --counters[1]; + update(group); + Thread.sleep(100); + } + else if("C".equals(group)) { + --counters[2]; + update(group); + Thread.sleep(10); + } + else { + log.warn("unknown group"); + } + if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0 ) { + msg.acknowledge(); + } + } + consumer.close(); + sess.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +}