fix AMQ-1917

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@690144 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-08-29 08:21:09 +00:00
parent 261157cdd6
commit ba16efde64
2 changed files with 236 additions and 30 deletions

View File

@ -210,11 +210,13 @@ public class Queue extends BaseDestination implements Task {
LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
dispatchLock.lock();
try {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
// MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
// needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
@ -229,32 +231,28 @@ public class Queue extends BaseDestination implements Task {
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
// etc.
// any newly paged in messages that are not dispatched are added to pagedInPending in iterate()
doPageIn(false);
synchronized (pagedInMessages) {
RecoveryDispatch rd = new RecoveryDispatch();
rd.messages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
rd.subscription = sub;
recoveries.addLast(rd);
}
if( sub instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)sub).incrementQueueRef();
}
if (!this.optimizedDispatch) {
wakeup();
wakeup();
}
}finally {
dispatchLock.unlock();
}
if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}
@ -262,11 +260,10 @@ public class Queue extends BaseDestination implements Task {
public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
dispatchLock.lock();
try {
// synchronize with dispatch method so that no new messages are sent
// while
// removing up a subscription.
synchronized (consumers) {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
@ -324,7 +321,6 @@ public class Queue extends BaseDestination implements Task {
}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
// System.out.println(getName()+" send "+message.getMessageId());
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
@ -934,9 +930,17 @@ public class Queue extends BaseDestination implements Task {
for (QueueMessageReference node : rd.messages) {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (rd.subscription.matches(node, msgContext)) {
rd.subscription.add(node);
if (rd.subscription.matches(node, msgContext)) {
rd.subscription.add(node);
} else {
// make sure it gets queued for dispatched again
dispatchLock.lock();
try {
pagedInPendingDispatch.add(node);
} finally {
dispatchLock.unlock();
}
}
}
}
@ -949,24 +953,24 @@ public class Queue extends BaseDestination implements Task {
}
}
boolean result = false;
boolean pageInMoreMessages = false;
synchronized (messages) {
result = !messages.isEmpty();
pageInMoreMessages = !messages.isEmpty();
}
// Kinda ugly.. but I think dispatchLock is the only mutex protecting the
// pagedInPendingDispatch variable.
dispatchLock.lock();
try {
result |= !pagedInPendingDispatch.isEmpty();
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
} finally {
dispatchLock.unlock();
}
// Perhaps we should page always into the pagedInPendingDispatch list is
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
if (result) {
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
if (pageInMoreMessages) {
try {
pageInMessages(false);
@ -1116,8 +1120,8 @@ public class Queue extends BaseDestination implements Task {
int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
toPageIn = Math.min(toPageIn,getMaxPageSize());
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
@ -1158,21 +1162,17 @@ public class Queue extends BaseDestination implements Task {
dispatchLock.lock();
try {
if(!pagedInPendingDispatch.isEmpty()) {
// System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
// Try to first dispatch anything that had not been dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
}
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
// System.out.println(getName()+": dispatching from paged in: "+list.size());
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
pagedInPendingDispatch.addAll(list);
}
// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
}
} finally {
dispatchLock.unlock();
@ -1200,7 +1200,6 @@ public class Queue extends BaseDestination implements Task {
if (!s.isFull()) {
// Dispatch it.
s.add(node);
//System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
target = s;
break;
} else {

View File

@ -0,0 +1,207 @@
package org.apache.activemq.bugs;
import junit.framework.TestCase;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
public class AMQ1917Test extends TestCase {
private static final int NUM_MESSAGES = 4000;
private static final int NUM_THREADS = 10;
public static final String REQUEST_QUEUE = "mock.in.queue";
public static final String REPLY_QUEUE = "mock.out.queue";
Destination requestDestination = ActiveMQDestination.createDestination(
REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
Destination replyDestination = ActiveMQDestination.createDestination(
REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
CountDownLatch errorLatch = new CountDownLatch(1);
ThreadPoolExecutor tpe;
final String BROKER_URL = "tcp://localhost:61616";
BrokerService broker = null;
private boolean working = true;
// trival session/producer pool
final Session[] sessions = new Session[NUM_THREADS];
final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(BROKER_URL);
broker.start();
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
TimeUnit.MILLISECONDS, queue);
ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
tpe.setThreadFactory(limitedthreadFactory);
}
public void tearDown() throws Exception {
broker.stop();
tpe.shutdown();
}
public void testLoadedSendRecieveWithCorrelationId() throws Exception {
ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
Connection connection = connectionFactory.createConnection();
setupReceiver(connection);
connection = connectionFactory.createConnection();
connection.start();
// trival session/producer pool
for (int i=0; i<NUM_THREADS; i++) {
sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producers[i] = sessions[i].createProducer(requestDestination);
}
for (int i = 0; i < NUM_MESSAGES; i++) {
MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination,
replyDestination, "Test Message : " + i);
tpe.execute(msr);
}
while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
fail("there was an error, check the console for thread or thread allocation failure");
break;
}
}
working = false;
}
private void setupReceiver(final Connection connection) throws Exception {
final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session
.createConsumer(requestDestination);
final MessageProducer sender = session.createProducer(replyDestination);
connection.start();
new Thread() {
public void run() {
while (working) {
// wait for messages in infinitive loop
// time out is set to show the client is awaiting
try {
TextMessage msg = (TextMessage) consumer.receive(20000);
if (msg == null) {
errorLatch.countDown();
fail("Response timed out."
+ " latchCount=" + roundTripLatch.getCount());
} else {
String result = msg.getText();
//System.out.println("Request:" + (i++)
// + ", msg=" + result + ", ID" + msg.getJMSMessageID());
TextMessage response = session.createTextMessage();
response.setJMSCorrelationID(msg.getJMSMessageID());
response.setText(result);
sender.send(response);
}
} catch (JMSException e) {
errorLatch.countDown();
fail("Unexpected exception:" + e);
}
}
}
}.start();
}
class MessageSenderReceiver implements Runnable {
Destination reqDest;
Destination replyDest;
String origMsg;
public MessageSenderReceiver(Destination reqDest,
Destination replyDest, String msg) throws Exception {
this.replyDest = replyDest;
this.reqDest = reqDest;
this.origMsg = msg;
}
private int getIndexFromCurrentThread() {
String name = Thread.currentThread().getName();
String num = name.substring(name.lastIndexOf('-') +1);
int idx = Integer.parseInt(num) -1;
assertTrue("idx is in range: idx=" + idx, idx < NUM_THREADS);
return idx;
}
public void run() {
try {
// get thread session and producer from pool
int threadIndex = getIndexFromCurrentThread();
Session session = sessions[threadIndex];
MessageProducer producer = producers[threadIndex];
final Message sendJmsMsg = session.createTextMessage(origMsg);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(sendJmsMsg);
String jmsId = sendJmsMsg.getJMSMessageID();
String selector = "JMSCorrelationID='" + jmsId + "'";
MessageConsumer consumer = session.createConsumer(replyDest,
selector);
Message receiveJmsMsg = consumer.receive(2000);
consumer.close();
if (receiveJmsMsg == null) {
errorLatch.countDown();
fail("Unable to receive response for:" + origMsg
+ ", with selector=" + selector);
} else {
//System.out.println("received response message :"
// + ((TextMessage) receiveJmsMsg).getText()
// + " with selector : " + selector);
roundTripLatch.countDown();
}
} catch (JMSException e) {
fail("unexpected exception:" + e);
}
}
}
public class LimitedThreadFactory implements ThreadFactory {
int threadCount;
private ThreadFactory factory;
public LimitedThreadFactory(ThreadFactory threadFactory) {
this.factory = threadFactory;
}
public Thread newThread(Runnable arg0) {
if (++threadCount > NUM_THREADS) {
errorLatch.countDown();
fail("too many threads requested");
}
return factory.newThread(arg0);
}
}
}