https://issues.apache.org/jira/browse/AMQ-2719 - resolve possible skipped dispatch - https://issues.apache.org/jira/browse/AMQ-2106 - remove overload so that distribution order is not compromised when consumers are removed with a valid last delivered sequence. Additional test that uses consumer recreation to redistribute groups

This commit is contained in:
gtully 2014-12-05 13:55:16 +00:00
parent 802e527ea4
commit 60ad053486
3 changed files with 227 additions and 22 deletions

View File

@ -224,8 +224,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (val == 0 && messageGroupOwners != null) { if (val == 0 && messageGroupOwners != null) {
// then ascending order of assigned message groups to favour less loaded consumers // then ascending order of assigned message groups to favour less loaded consumers
// Long.compare in jdk7 // Long.compare in jdk7
long x = s1.getConsumerInfo().getLastDeliveredSequenceId(); long x = s1.getConsumerInfo().getAssignedGroupCount();
long y = s2.getConsumerInfo().getLastDeliveredSequenceId(); long y = s2.getConsumerInfo().getAssignedGroupCount();
val = (x < y) ? -1 : ((x == y) ? 0 : 1); val = (x < y) ? -1 : ((x == y) ? 0 : 1);
} }
return val; return val;
@ -429,7 +429,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
@Override @Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() }); LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() });
super.addSubscription(context, sub); super.addSubscription(context, sub);
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
@ -500,13 +500,14 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// while removing up a subscription. // while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock(); pagedInPendingDispatchLock.writeLock().lock();
try { try {
LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
getActiveMQDestination().getQualifiedName(), getActiveMQDestination().getQualifiedName(),
sub, sub,
lastDeiveredSequenceId, lastDeiveredSequenceId,
getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDequeues().getCount(),
getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getDispatched().getCount(),
getDestinationStatistics().getInflight().getCount() getDestinationStatistics().getInflight().getCount(),
sub.getConsumerInfo().getAssignedGroupCount()
}); });
consumersLock.writeLock().lock(); consumersLock.writeLock().lock();
try { try {
@ -1975,19 +1976,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// dispatched before. // dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
} }
// and now see if we can dispatch the new stuff.. and append to the pending }
// list anything that does not actually get dispatched. // and now see if we can dispatch the new stuff.. and append to the pending
if (list != null && !list.isEmpty()) { // list anything that does not actually get dispatched.
if (pagedInPendingDispatch.isEmpty()) { if (list != null && !list.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list)); if (redeliveredWaitingDispatch.isEmpty() && pagedInPendingDispatch.isEmpty()) {
} else { pagedInPendingDispatch.addAll(doActualDispatch(list));
for (MessageReference qmr : list) { } else {
if (!pagedInPendingDispatch.contains(qmr)) { for (MessageReference qmr : list) {
pagedInPendingDispatch.addMessageLast(qmr); if (!pagedInPendingDispatch.contains(qmr)) {
} pagedInPendingDispatch.addMessageLast(qmr);
} }
doWakeUp = true;
} }
doWakeUp = true;
} }
} }
} finally { } finally {
@ -2006,7 +2007,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
*/ */
private PendingList doActualDispatch(PendingList list) throws Exception { private PendingList doActualDispatch(PendingList list) throws Exception {
List<Subscription> consumers; List<Subscription> consumers;
consumersLock.writeLock().lock(); consumersLock.readLock().lock();
try { try {
if (this.consumers.isEmpty()) { if (this.consumers.isEmpty()) {
@ -2015,7 +2016,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
consumers = new ArrayList<Subscription>(this.consumers); consumers = new ArrayList<Subscription>(this.consumers);
} finally { } finally {
consumersLock.writeLock().unlock(); consumersLock.readLock().unlock();
} }
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
@ -2101,7 +2102,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// A group sequence < 1 is an end of group signal. // A group sequence < 1 is an end of group signal.
if (sequence < 0) { if (sequence < 0) {
messageGroupOwners.removeGroup(groupId); messageGroupOwners.removeGroup(groupId);
subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1); subscription.getConsumerInfo().decrementAssignedGroupCount();
} }
} else { } else {
result = false; result = false;
@ -2117,7 +2118,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
Message message = n.getMessage(); Message message = n.getMessage();
message.setJMSXGroupFirstForConsumer(true); message.setJMSXGroupFirstForConsumer(true);
subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1); subs.getConsumerInfo().incrementAssignedGroupCount();
} }
protected void pageInMessages(boolean force) throws Exception { protected void pageInMessages(boolean force) throws Exception {

View File

@ -62,9 +62,8 @@ public class ConsumerInfo extends BaseCommand {
// not marshalled, populated from RemoveInfo, the last message delivered, used // not marshalled, populated from RemoveInfo, the last message delivered, used
// to suppress redelivery on prefetched messages after close // to suppress redelivery on prefetched messages after close
// overload; also used at runtime to track assignment of message groups
private transient long lastDeliveredSequenceId; private transient long lastDeliveredSequenceId;
private transient long assignedGroupCount;
// originated from a // originated from a
// network connection // network connection
@ -495,4 +494,16 @@ public class ConsumerInfo extends BaseCommand {
return lastDeliveredSequenceId; return lastDeliveredSequenceId;
} }
public void incrementAssignedGroupCount() {
this.assignedGroupCount++;
}
public void decrementAssignedGroupCount() {
this.assignedGroupCount--;
}
public long getAssignedGroupCount() {
return assignedGroupCount;
}
} }

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(BlockJUnit4ClassRunner.class)
public class MessageGroupReconnectDistributionTest {
public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class);
protected Connection connection;
protected Session session;
protected MessageProducer producer;
protected Destination destination;
BrokerService broker;
protected TransportConnector connector;
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=30");
connection = connFactory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = new ActiveMQQueue("GroupQ");
producer = session.createProducer(destination);
connection.start();
}
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setPersistent(false);
service.setUseJmx(true);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setUseConsumerPriority(true);
policy.setMessageGroupMapFactoryType("cached");
policyMap.setDefaultEntry(policy);
service.setDestinationPolicy(policyMap);
connector = service.addConnector("tcp://localhost:0");
return service;
}
@After
public void tearDown() throws Exception {
producer.close();
session.close();
connection.close();
broker.stop();
}
final Random random = new Random();
public int getBatchSize(int bound) throws Exception {
return bound + random.nextInt(bound);
}
@Test(timeout = 20 * 60 * 1000)
public void testReconnect() throws Exception {
final int numMessages = 50000;
final int numConsumers = 10;
final AtomicLong totalConsumed = new AtomicLong(0);
produceMessages(numMessages);
ExecutorService executorService = Executors.newCachedThreadPool();
final ArrayList<AtomicLong> consumedCounters = new ArrayList<AtomicLong>(numConsumers);
for (int i=0;i<numConsumers; i++) {
consumedCounters.add(new AtomicLong(0l));
final int id = i;
executorService.submit(new Runnable() {
Session connectionSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@Override
public void run() {
try {
MessageConsumer messageConsumer = connectionSession.createConsumer(destination);
long batchSize = getBatchSize(numConsumers);
Message message;
AtomicLong consumed = consumedCounters.get(id);
LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", totalConsumed:" + totalConsumed.get() + ", consumed:" + consumed.get());
while (totalConsumed.get() < numMessages) {
message = messageConsumer.receive(10000);
if (message == null) {
LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", null message (totalConsumed:" + totalConsumed.get() + ") consumed:" + consumed.get());
messageConsumer.close();
if (totalConsumed.get() == numMessages) {
break;
} else {
messageConsumer = connectionSession.createConsumer(destination);
continue;
}
}
message.acknowledge();
consumed.incrementAndGet();
totalConsumed.incrementAndGet();
if (consumed.get() > 0 && consumed.longValue() % batchSize == 0) {
messageConsumer.close();
messageConsumer = connectionSession.createConsumer(destination);
batchSize = getBatchSize(numConsumers);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
assertTrue("threads done on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
assertEquals("All consumed", numMessages, totalConsumed.intValue());
LOG.info("Distribution: " + consumedCounters);
double max = consumedCounters.get(0).longValue() * 1.5;
double min = consumedCounters.get(0).longValue() * 0.5;
for (AtomicLong l : consumedCounters) {
assertTrue("Even +/- 50% distribution on consumed:" + consumedCounters + ", outlier:" + l.get(),
l.longValue() < max && l.longValue() > min);
}
}
private void produceMessages(int numMessages) throws JMSException {
for (int i = 0; i < numMessages; i++) {
TextMessage msga = session.createTextMessage("hello " +i);
msga.setStringProperty("JMSXGroupID", msga.getText());
producer.send(msga);
}
}
}