diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index cd8804857e..bb583979c8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -210,11 +210,19 @@ public class Queue extends BaseDestination implements Task, UsageListener { private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); - private static final Comparator orderedCompare = new Comparator() { + private final Comparator orderedCompare = new Comparator() { public int compare(Subscription s1, Subscription s2) { // We want the list sorted in descending order - return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); + int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); + if (val == 0 && messageGroupOwners != null) { + // then ascending order of assigned message groups to favour less loaded consumers + // Long.compare in jdk7 + long x = s1.getConsumerInfo().getLastDeliveredSequenceId(); + long y = s2.getConsumerInfo().getLastDeliveredSequenceId(); + val = (x < y) ? -1 : ((x == y) ? 0 : 1); + } + return val; } }; @@ -1934,8 +1942,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { String groupId = node.getGroupID(); int sequence = node.getGroupSequence(); if (groupId != null) { - //MessageGroupMap messageGroupOwners = ((Queue) node - // .getRegionDestination()).getMessageGroupOwners(); MessageGroupMap messageGroupOwners = getMessageGroupOwners(); // If we can own the first, then no-one else should own the @@ -1956,6 +1962,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // A group sequence < 1 is an end of group signal. if (sequence < 0) { messageGroupOwners.removeGroup(groupId); + subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1); } } else { result = false; @@ -1979,6 +1986,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { LOG.warn("Failed to set boolean header: " + e, e); } } + subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1); } protected void pageInMessages(boolean force) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java index 650a7f57d2..c1ef735b2d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -61,6 +61,7 @@ public class ConsumerInfo extends BaseCommand { // not marshalled, populated from RemoveInfo, the last message delivered, used // to suppress redelivery on prefetched messages after close + // overload; also used at runtime to track assignment of message groups private transient long lastDeliveredSequenceId; // originated from a diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java new file mode 100644 index 0000000000..68f449ebf9 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MessageGroupLateArrivalsTest extends JmsTestSupport { + public static final Logger log = LoggerFactory.getLogger(MessageGroupLateArrivalsTest.class); + protected Connection connection; + protected Session session; + protected MessageProducer producer; + protected Destination destination; + + BrokerService broker; + protected TransportConnector connector; + + protected HashMap messageCount = new HashMap(); + protected HashMap> messageGroups = new HashMap>(); + + public static Test suite() { + return suite(MessageGroupLateArrivalsTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=1000"); + connection = connFactory.createConnection(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = new ActiveMQQueue("test-queue2"); + producer = session.createProducer(destination); + connection.start(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setUseConsumerPriority(true); + policyMap.setDefaultEntry(policy); + service.setDestinationPolicy(policyMap); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + public void tearDown() throws Exception { + producer.close(); + session.close(); + connection.close(); + broker.stop(); + } + + public void testConsumersLateToThePartyGetSomeNewGroups() throws Exception { + + final int perBatch = 3; + int[] counters = {perBatch, perBatch, perBatch}; + + CountDownLatch startSignal = new CountDownLatch(0); + CountDownLatch doneSignal = new CountDownLatch(1); + + messageCount.put("worker1", 0); + messageGroups.put("worker1", new HashSet()); + Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); + messageCount.put("worker2", 0); + messageGroups.put("worker2", new HashSet()); + Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); + messageCount.put("worker3", 0); + messageGroups.put("worker3", new HashSet()); + Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups); + + new Thread(worker1).start(); + new Thread(worker2).start(); + + for (int i = 0; i < perBatch; i++) { + Message msga = session.createTextMessage("hello a"); + msga.setStringProperty("JMSXGroupID", "A"); + producer.send(msga); + + Message msgb = session.createTextMessage("hello b"); + msgb.setStringProperty("JMSXGroupID", "B"); + producer.send(msgb); + } + + // ensure this chap, late to the party gets a new group + new Thread(worker3).start(); + + // wait for presence before new group + TimeUnit.SECONDS.sleep(4); + + // ensure worker 3 is not next in line with normal dispatch + //Message msga = session.createTextMessage("hello to who ever is next in line"); + //producer.send(msga); + + for (int i = 0; i < perBatch; i++) { + Message msgc = session.createTextMessage("hello c"); + msgc.setStringProperty("JMSXGroupID", "C"); + producer.send(msgc); + } + + doneSignal.await(); + + for (String worker : messageCount.keySet()) { + log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)); + assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) + , perBatch, messageCount.get(worker).intValue()); + assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) + , 1, messageGroups.get(worker).size()); + } + } + + public void testConsumerLateToBigPartyGetsNewGroup() throws Exception { + + final int perBatch = 2; + int[] counters = {perBatch, perBatch, perBatch}; + + CountDownLatch startSignal = new CountDownLatch(0); + CountDownLatch doneSignal = new CountDownLatch(1); + + messageCount.put("worker1", 0); + messageGroups.put("worker1", new HashSet()); + Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups); + messageCount.put("worker2", 0); + messageGroups.put("worker2", new HashSet()); + Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups); + + new Thread(worker1).start(); + + for (int i = 0; i < perBatch; i++) { + Message msga = session.createTextMessage("hello c"); + msga.setStringProperty("JMSXGroupID", "A"); + producer.send(msga); + + Message msgb = session.createTextMessage("hello b"); + msgb.setStringProperty("JMSXGroupID", "B"); + producer.send(msgb); + } + + // ensure this chap, late to the party gets a new group + new Thread(worker2).start(); + + // wait for presence before new group + TimeUnit.SECONDS.sleep(4); + + for (int i = 0; i < perBatch; i++) { + Message msgc = session.createTextMessage("hello a"); + msgc.setStringProperty("JMSXGroupID", "C"); + producer.send(msgc); + } + + doneSignal.await(); + + log.info("worker1 received " + messageCount.get("worker1") + " messages from groups " + messageGroups.get("worker1")); + assertEquals("worker1 received " + messageCount.get("worker1") + " messages from groups " + messageGroups.get("worker1") + , 2 * perBatch, messageCount.get("worker1").intValue()); + assertEquals("worker1 received " + messageCount.get("worker1") + " messages from groups " + messageGroups.get("worker1") + , 2, messageGroups.get("worker1").size()); + + log.info("worker2 received " + messageCount.get("worker2") + " messages from groups " + messageGroups.get("worker2")); + assertEquals("worker2 received " + messageCount.get("worker2") + " messages from groups " + messageGroups.get("worker2") + , 2 * perBatch, messageCount.get("worker1").intValue()); + assertEquals("worker2 received " + messageCount.get("worker2") + " messages from groups " + messageGroups.get("worker2") + , 1, messageGroups.get("worker2").size()); + } + + private static final class Worker implements Runnable { + private Connection connection = null; + private Destination queueName = null; + private String workerName = null; + private CountDownLatch startSignal = null; + private CountDownLatch doneSignal = null; + private int[] counters = null; + private HashMap messageCount; + private HashMap> messageGroups; + + + private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap messageCount, HashMap> messageGroups) { + this.connection = connection; + this.queueName = queueName; + this.workerName = workerName; + this.startSignal = startSignal; + this.doneSignal = doneSignal; + this.counters = counters; + this.messageCount = messageCount; + this.messageGroups = messageGroups; + } + + private void update(String group) { + int msgCount = messageCount.get(workerName); + messageCount.put(workerName, msgCount + 1); + Set groups = messageGroups.get(workerName); + groups.add(group); + messageGroups.put(workerName, groups); + } + + public void run() { + + try { + startSignal.await(); + log.info(workerName); + Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = sess.createConsumer(queueName); + + while (true) { + if (counters[0] == 0 && counters[1] == 0 && counters[2] == 0) { + doneSignal.countDown(); + log.info(workerName + " done..."); + break; + } + + Message msg = consumer.receive(500); + if (msg == null) + continue; + + msg.acknowledge(); + + String group = msg.getStringProperty("JMSXGroupID"); + boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer"); + + if ("A".equals(group)) { + --counters[0]; + update(group); + //Thread.sleep(500); + } else if ("B".equals(group)) { + --counters[1]; + update(group); + //Thread.sleep(100); + } else if ("C".equals(group)) { + --counters[2]; + update(group); + //Thread.sleep(10); + } else { + log.warn(workerName + ", unknown group"); + } + if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0) { + msg.acknowledge(); + } + } + consumer.close(); + sess.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +}