resolve AMQ-2102|https://issues.apache.org/activemq/browse/AMQ-2102 - refactor message dispatch on slave to take account of subscription choice on the master, this ensures slave is in sync w.r.t outstanding acks. processDispatchNotification imoplemented by Queue type destinations which delegates to subscription after doing a dispatch, test demonstrates slve out of sync errors

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@753214 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-03-13 11:59:08 +00:00
parent b514d3fb4b
commit f9d5449f47
10 changed files with 275 additions and 43 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.ft;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
@ -28,6 +30,7 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
@ -58,8 +61,9 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
private static final Log LOG = LogFactory.getLog(MasterBroker.class);
private Transport slave;
private AtomicBoolean started = new AtomicBoolean(false);
private final Object addConsumerLock = new Object();
private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>();
/**
* Constructor
*
@ -197,14 +201,19 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
* @throws Exception
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
// as master and slave do independent dispatch, the consumer add order between master and slave
// needs to be maintained
synchronized (addConsumerLock) {
sendSyncToSlave(info);
return super.addConsumer(context, info);
}
sendSyncToSlave(info);
consumers.put(info.getConsumerId(), info.getConsumerId());
return super.addConsumer(context, info);
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
super.removeConsumer(context, info);
consumers.remove(info.getConsumerId());
sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
}
/**
* remove a subscription
*
@ -317,7 +326,9 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
if (messageDispatch.getMessage() != null) {
Message msg = messageDispatch.getMessage();
mdn.setMessageId(msg.getMessageId());
sendSyncToSlave(mdn);
if (consumers.containsKey(messageDispatch.getConsumerId())) {
sendSyncToSlave(mdn);
}
}
}

View File

@ -418,6 +418,34 @@ public abstract class AbstractRegion implements Region {
Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
if (sub != null) {
sub.processMessageDispatchNotification(messageDispatchNotification);
} else {
throw new JMSException("Slave broker out of sync with master - Subscription: "
+ messageDispatchNotification.getConsumerId()
+ " on " + messageDispatchNotification.getDestination()
+ " does not exist for dispatch of message: "
+ messageDispatchNotification.getMessageId());
}
}
/*
* For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till
* the notification to ensure that the subscription chosen by the master is used. AMQ-2102
*/
protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception {
Destination dest = null;
synchronized (destinationsMutex) {
dest = destinations.get(messageDispatchNotification.getDestination());
}
if (dest != null) {
dest.processDispatchNotification(messageDispatchNotification);
} else {
throw new JMSException(
"Slave broker out of sync with master - Destination: "
+ messageDispatchNotification.getDestination()
+ " does not exist for consumer "
+ messageDispatchNotification.getConsumerId()
+ " with message: "
+ messageDispatchNotification.getMessageId());
}
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
@ -27,6 +29,7 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
@ -485,4 +488,9 @@ public abstract class BaseDestination implements Destination {
}
}
}
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception {
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
@ -175,4 +176,12 @@ public interface Destination extends Service, Task {
void isFull(ConnectionContext context,Usage usage);
List<Subscription> getConsumers();
/**
* called on Queues in slave mode to allow dispatch to follow subscription choice of master
* @param messageDispatchNotification
* @throws Exception
*/
void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception;
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
@ -259,4 +260,9 @@ public class DestinationFilter implements Destination {
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
}
}

View File

@ -38,7 +38,6 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -48,15 +47,14 @@ import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
@ -65,7 +63,6 @@ import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
@ -1001,7 +998,7 @@ public class Queue extends BaseDestination implements Task {
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
boolean pageInMoreMessages = false;
boolean pageInMoreMessages = false;
synchronized(iteratingMutex) {
BrowserDispatch rd;
while ((rd = getNextBrowserDispatch()) != null) {
@ -1244,13 +1241,13 @@ public class Queue extends BaseDestination implements Task {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) {
try {
messages.setMaxBatchSize(toPageIn);
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
@ -1326,7 +1323,8 @@ public class Queue extends BaseDestination implements Task {
List<Subscription> consumers;
synchronized (this.consumers) {
if (this.consumers.isEmpty()) {
if (this.consumers.isEmpty() || isSlave()) {
// slave dispatch happens in processDispatchNotification
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
@ -1422,4 +1420,104 @@ public class Queue extends BaseDestination implements Task {
return total;
}
/*
* In slave mode, dispatch is ignored till we get this notification as the dispatch
* process is non deterministic between master and slave.
* On a notification, the actual dispatch to the subscription (as chosen by the master)
* is completed.
* (non-Javadoc)
* @see org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
*/
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception {
// do dispatch
Subscription sub = getMatchingSubscription(messageDispatchNotification);
if (sub != null) {
MessageReference message = getMatchingMessage(messageDispatchNotification);
sub.add(message);
sub.processMessageDispatchNotification(messageDispatchNotification);
}
}
private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
dispatchLock.lock();
try {
synchronized (pagedInPendingDispatch) {
for(QueueMessageReference ref : pagedInPendingDispatch) {
if (messageId.equals(ref.getMessageId())) {
message = ref;
pagedInPendingDispatch.remove(ref);
break;
}
}
}
if (message == null) {
synchronized (pagedInMessages) {
message = pagedInMessages.get(messageId);
}
}
if (message == null) {
synchronized (messages) {
try {
messages.setMaxBatchSize(getMaxPageSize());
messages.reset();
while (messages.hasNext()) {
MessageReference node = messages.next();
node.incrementReferenceCount();
messages.remove();
if (messageId.equals(node.getMessageId())) {
message = this.createMessageReference(node.getMessage());
break;
}
}
} finally {
messages.release();
}
}
}
if (message == null) {
Message msg = loadMessage(messageId);
if (msg != null) {
message = this.createMessageReference(msg);
}
}
} finally {
dispatchLock.unlock();
}
if (message == null) {
throw new JMSException(
"Slave broker out of sync with master - Message: "
+ messageDispatchNotification.getMessageId()
+ " on " + messageDispatchNotification.getDestination()
+ " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: "
+ messageDispatchNotification.getConsumerId());
}
return message;
}
/**
* Find a consumer that matches the id in the message dispatch notification
* @param messageDispatchNotification
* @return sub or null if the subscription has been removed before dispatch
* @throws JMSException
*/
private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) throws JMSException {
Subscription sub = null;
synchronized (consumers) {
for (Subscription s : consumers) {
if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
sub = s;
break;
}
}
}
return sub;
}
}

View File

@ -24,6 +24,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
@ -64,4 +65,15 @@ public class QueueRegion extends AbstractRegion {
}
return inactiveDestinations;
}
/*
* For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
* the notification to ensure that the subscription chosen by the master is used.
*
* (non-Javadoc)
* @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
*/
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
processDispatchNotificationViaDestination(messageDispatchNotification);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
@ -72,4 +73,16 @@ public class TempQueueRegion extends AbstractTempRegion {
super.removeDestination(context, destination, timeout);
}
/*
* For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
* the notification to ensure that the subscription chosen by the master is used.
*
* (non-Javadoc)
* @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
*/
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
processDispatchNotificationViaDestination(messageDispatchNotification);
}
}

View File

@ -108,11 +108,8 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
// REVISIT the following two are not dependable at the moment, off by a small number
// for some reason? The work for a COUNT < ~500
//
//assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
//assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount());
assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount());
assertEquals("dequeues match",
rb.getDestinationStatistics().getDequeues().getCount(),

View File

@ -47,14 +47,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
final static int MESSAGE_COUNT = 12120;
final static int NUM_CONSUMERS = 10;
final static int CONSUME_ALL = -1;
final static int MESSAGE_COUNT = 5120;
final static int NUM_CONSUMERS = 20;
private static final Log LOG = LogFactory.getLog(AMQ2102Test.class);
private final Map<Thread, Throwable> exceptions = new ConcurrentHashMap<Thread, Throwable>();
private final static Map<Thread, Throwable> exceptions = new ConcurrentHashMap<Thread, Throwable>();
private class Consumer implements Runnable, ExceptionListener {
private ActiveMQConnectionFactory connectionFactory;
@ -63,12 +64,14 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
private boolean running;
private org.omg.CORBA.IntHolder startup;
private Thread thread;
private int numToProcessPerIteration;
Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id) {
Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) {
this.connectionFactory = connectionFactory;
this.queueName = queueName;
this.startup = startup;
name = "Consumer-" + queueName + "-" + id;
numToProcessPerIteration = numToProcess;
thread = new Thread(this, name);
}
@ -93,6 +96,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
}
public void onException(JMSException e) {
exceptions.put(Thread.currentThread(), e);
error("JMS exception: ", e);
}
@ -146,7 +150,13 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
Session session = null;
try {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
processMessages(session);
if (numToProcessPerIteration > 0) {
while(isRunning()) {
processMessages(session);
}
} else {
processMessages(session);
}
} finally {
if (session != null) {
session.close();
@ -189,7 +199,8 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
}
startup = null;
}
while (isRunning()) {
int numToProcess = numToProcessPerIteration;
do {
Message message = consumer.receive(5000);
if (message != null) {
@ -201,7 +212,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
session.rollback();
}
}
}
} while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && isRunning());
}
public void run() {
@ -224,7 +235,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
}
}
private class Producer {
private class Producer implements ExceptionListener {
private ActiveMQConnectionFactory connectionFactory;
private String queueName;
@ -246,6 +257,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
try {
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
sendMessages(connection);
@ -302,6 +314,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
sendMessages(session, replyQueue, consumer);
} finally {
consumer.close();
session.commit();
}
}
@ -326,9 +339,8 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
}
}
private void sendMessages(Session session, Destination replyQueue, MessageConsumer consumer) throws JMSException {
private void sendMessages(final Session session, Destination replyQueue, MessageConsumer consumer) throws JMSException {
final org.omg.CORBA.IntHolder messageCount = new org.omg.CORBA.IntHolder(MESSAGE_COUNT);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message reply) {
if (reply instanceof TextMessage) {
@ -340,6 +352,15 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
error("Problem processing reply", e);
}
messageCount.value--;
if (messageCount.value % 200 == 0) {
// ack a bunch of replys
info("acking via session commit: messageCount=" + messageCount.value);
try {
session.commit();
} catch (JMSException e) {
error("Failed to commit with count: " + messageCount.value, e);
}
}
messageCount.notify();
}
} else {
@ -354,11 +375,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
synchronized (messageCount) {
while (messageCount.value > 0) {
if (messageCount.value % 100 == 0) {
// ack a bunch of replys
debug("acking via session commit: messageCount=" + messageCount.value);
session.commit();
}
try {
messageCount.wait();
} catch (InterruptedException e) {
@ -370,12 +387,21 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
session.commit();
debug("All replies received...");
}
public void onException(JMSException exception) {
LOG.error(exception);
exceptions.put(Thread.currentThread(), exception);
}
}
private static void debug(String message) {
LOG.debug(message);
}
private static void info(String message) {
LOG.info(message);
}
private static void error(String message) {
LOG.error(message);
}
@ -384,15 +410,17 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
t.printStackTrace();
String msg = message + ": " + (t.getMessage() != null ? t.getMessage() : t.toString());
LOG.error(msg, t);
exceptions.put(Thread.currentThread(), t);
fail(msg);
}
private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory connectionFactory, String queueName, int max) {
private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory connectionFactory, String queueName,
int max, int numToProcessPerConsumer) {
ArrayList<Consumer> consumers = new ArrayList<Consumer>(max);
org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max);
for (int id = 0; id < max; id++) {
consumers.add(new Consumer(connectionFactory, queueName, startup, id));
consumers.add(new Consumer(connectionFactory, queueName, startup, id, numToProcessPerConsumer));
}
for (Consumer consumer : consumers) {
consumer.start();
@ -445,6 +473,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
public void tearDown() throws Exception {
master.stop();
slave.stop();
exceptions.clear();
}
public void testMasterSlaveBug() throws Exception {
@ -453,7 +482,7 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" +
masterUrl + ")?randomize=false");
String queueName = "MasterSlaveBug";
ArrayList<Consumer> consumers = createConsumers(connectionFactory, queueName, NUM_CONSUMERS);
ArrayList<Consumer> consumers = createConsumers(connectionFactory, queueName, NUM_CONSUMERS, CONSUME_ALL);
Producer producer = new Producer(connectionFactory, queueName);
producer.execute(new String[]{});
@ -468,10 +497,31 @@ public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
assertTrue(exceptions.isEmpty());
}
public void testMasterSlaveBugWithStopStartConsumers() throws Exception {
Thread.setDefaultUncaughtExceptionHandler(this);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"failover:(" + masterUrl + ")?randomize=false");
String queueName = "MasterSlaveBug";
ArrayList<Consumer> consumers = createConsumers(connectionFactory,
queueName, NUM_CONSUMERS, 10);
Producer producer = new Producer(connectionFactory, queueName);
producer.execute(new String[] {});
for (Consumer consumer : consumers) {
consumer.setRunning(false);
}
for (Consumer consumer : consumers) {
consumer.join();
}
assertTrue(exceptions.isEmpty());
}
public void uncaughtException(Thread t, Throwable e) {
error("" + t + e);
exceptions.put(t,e);
}
}