diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 43f02f84fa..f8373ac5ee 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -224,8 +224,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (val == 0 && messageGroupOwners != null) { // then ascending order of assigned message groups to favour less loaded consumers // Long.compare in jdk7 - long x = s1.getConsumerInfo().getLastDeliveredSequenceId(); - long y = s2.getConsumerInfo().getLastDeliveredSequenceId(); + long x = s1.getConsumerInfo().getAssignedGroupCount(); + long y = s2.getConsumerInfo().getAssignedGroupCount(); val = (x < y) ? -1 : ((x == y) ? 0 : 1); } return val; @@ -429,7 +429,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index @Override public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { - LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() }); + LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() }); super.addSubscription(context, sub); // synchronize with dispatch method so that no new messages are sent @@ -500,13 +500,14 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // while removing up a subscription. pagedInPendingDispatchLock.writeLock().lock(); try { - LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ + LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, lastDeiveredSequenceId, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), - getDestinationStatistics().getInflight().getCount() + getDestinationStatistics().getInflight().getCount(), + sub.getConsumerInfo().getAssignedGroupCount() }); consumersLock.writeLock().lock(); try { @@ -1975,19 +1976,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // dispatched before. pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); } - // and now see if we can dispatch the new stuff.. and append to the pending - // list anything that does not actually get dispatched. - if (list != null && !list.isEmpty()) { - if (pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(doActualDispatch(list)); - } else { - for (MessageReference qmr : list) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.addMessageLast(qmr); - } + } + // and now see if we can dispatch the new stuff.. and append to the pending + // list anything that does not actually get dispatched. + if (list != null && !list.isEmpty()) { + if (redeliveredWaitingDispatch.isEmpty() && pagedInPendingDispatch.isEmpty()) { + pagedInPendingDispatch.addAll(doActualDispatch(list)); + } else { + for (MessageReference qmr : list) { + if (!pagedInPendingDispatch.contains(qmr)) { + pagedInPendingDispatch.addMessageLast(qmr); } - doWakeUp = true; } + doWakeUp = true; } } } finally { @@ -2006,7 +2007,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index */ private PendingList doActualDispatch(PendingList list) throws Exception { List consumers; - consumersLock.writeLock().lock(); + consumersLock.readLock().lock(); try { if (this.consumers.isEmpty()) { @@ -2015,7 +2016,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } consumers = new ArrayList(this.consumers); } finally { - consumersLock.writeLock().unlock(); + consumersLock.readLock().unlock(); } Set fullConsumers = new HashSet(this.consumers.size()); @@ -2101,7 +2102,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // A group sequence < 1 is an end of group signal. if (sequence < 0) { messageGroupOwners.removeGroup(groupId); - subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1); + subscription.getConsumerInfo().decrementAssignedGroupCount(); } } else { result = false; @@ -2117,7 +2118,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); Message message = n.getMessage(); message.setJMSXGroupFirstForConsumer(true); - subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1); + subs.getConsumerInfo().incrementAssignedGroupCount(); } protected void pageInMessages(boolean force) throws Exception { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java index 01f224390f..c9fc3e6340 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -62,9 +62,8 @@ public class ConsumerInfo extends BaseCommand { // not marshalled, populated from RemoveInfo, the last message delivered, used // to suppress redelivery on prefetched messages after close - // overload; also used at runtime to track assignment of message groups private transient long lastDeliveredSequenceId; - + private transient long assignedGroupCount; // originated from a // network connection @@ -495,4 +494,16 @@ public class ConsumerInfo extends BaseCommand { return lastDeliveredSequenceId; } + public void incrementAssignedGroupCount() { + this.assignedGroupCount++; + } + + public void decrementAssignedGroupCount() { + this.assignedGroupCount--; + } + + public long getAssignedGroupCount() { + return assignedGroupCount; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java new file mode 100644 index 0000000000..4799076950 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(BlockJUnit4ClassRunner.class) +public class MessageGroupReconnectDistributionTest { + public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class); + protected Connection connection; + protected Session session; + protected MessageProducer producer; + protected Destination destination; + + BrokerService broker; + protected TransportConnector connector; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=30"); + connection = connFactory.createConnection(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = new ActiveMQQueue("GroupQ"); + producer = session.createProducer(destination); + connection.start(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(true); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setUseConsumerPriority(true); + policy.setMessageGroupMapFactoryType("cached"); + policyMap.setDefaultEntry(policy); + service.setDestinationPolicy(policyMap); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + @After + public void tearDown() throws Exception { + producer.close(); + session.close(); + connection.close(); + broker.stop(); + } + + final Random random = new Random(); + public int getBatchSize(int bound) throws Exception { + return bound + random.nextInt(bound); + } + + @Test(timeout = 20 * 60 * 1000) + public void testReconnect() throws Exception { + + final int numMessages = 50000; + final int numConsumers = 10; + final AtomicLong totalConsumed = new AtomicLong(0); + + produceMessages(numMessages); + + ExecutorService executorService = Executors.newCachedThreadPool(); + final ArrayList consumedCounters = new ArrayList(numConsumers); + for (int i=0;i 0 && consumed.longValue() % batchSize == 0) { + messageConsumer.close(); + messageConsumer = connectionSession.createConsumer(destination); + batchSize = getBatchSize(numConsumers); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + assertTrue("threads done on time", executorService.awaitTermination(10, TimeUnit.MINUTES)); + + assertEquals("All consumed", numMessages, totalConsumed.intValue()); + + LOG.info("Distribution: " + consumedCounters); + + double max = consumedCounters.get(0).longValue() * 1.5; + double min = consumedCounters.get(0).longValue() * 0.5; + + for (AtomicLong l : consumedCounters) { + assertTrue("Even +/- 50% distribution on consumed:" + consumedCounters + ", outlier:" + l.get(), + l.longValue() < max && l.longValue() > min); + } + } + + private void produceMessages(int numMessages) throws JMSException { + for (int i = 0; i < numMessages; i++) { + TextMessage msga = session.createTextMessage("hello " +i); + msga.setStringProperty("JMSXGroupID", msga.getText()); + producer.send(msga); + } + } +}