mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@811425 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b0e3570652
commit
c459784012
|
@ -16,29 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
|
@ -77,6 +54,26 @@ import org.apache.activemq.usage.UsageListener;
|
|||
import org.apache.activemq.util.BrokerSupport;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -100,7 +97,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
private final Object sendLock = new Object();
|
||||
private ExecutorService executor;
|
||||
protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
|
||||
private final ReentrantLock dispatchLock = new ReentrantLock();
|
||||
private final Object dispatchMutex = new Object();
|
||||
private boolean useConsumerPriority=true;
|
||||
private boolean strictOrderDispatch=false;
|
||||
private QueueDispatchSelector dispatchSelector;
|
||||
|
@ -276,8 +273,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
// synchronize with dispatch method so that no new messages are sent
|
||||
// while setting up a subscription. avoid out of order messages,
|
||||
// duplicates, etc.
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
synchronized(dispatchMutex) {
|
||||
|
||||
sub.add(context, this);
|
||||
destinationStatistics.getConsumers().increment();
|
||||
|
||||
|
@ -324,8 +321,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
if (!(this.optimizedDispatch || isSlave())) {
|
||||
wakeup();
|
||||
}
|
||||
}finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
if (this.optimizedDispatch || isSlave()) {
|
||||
// Outside of dispatchLock() to maintain the lock hierarchy of
|
||||
|
@ -339,8 +334,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
destinationStatistics.getConsumers().decrement();
|
||||
// synchronize with dispatch method so that no new messages are sent
|
||||
// while removing up a subscription.
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
synchronized(dispatchMutex) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
|
||||
+ ", dequeues: " + getDestinationStatistics().getDequeues().getCount()
|
||||
|
@ -390,8 +384,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
if (!(this.optimizedDispatch || isSlave())) {
|
||||
wakeup();
|
||||
}
|
||||
}finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
if (this.optimizedDispatch || isSlave()) {
|
||||
// Outside of dispatchLock() to maintain the lock hierarchy of
|
||||
|
@ -750,8 +742,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
try {
|
||||
pageInMessages(forcePageIn);
|
||||
List<MessageReference> toExpire = new ArrayList<MessageReference>();
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
synchronized(dispatchMutex) {
|
||||
synchronized (pagedInPendingDispatch) {
|
||||
addAll(pagedInPendingDispatch, l, max, toExpire);
|
||||
for (MessageReference ref : toExpire) {
|
||||
|
@ -796,9 +787,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Problem retrieving message for browse", e);
|
||||
}
|
||||
|
@ -1161,12 +1150,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
|
||||
// Kinda ugly.. but I think dispatchLock is the only mutex protecting the
|
||||
// pagedInPendingDispatch variable.
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
synchronized(dispatchMutex) {
|
||||
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
|
||||
} finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// Perhaps we should page always into the pagedInPendingDispatch list is
|
||||
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
|
||||
|
@ -1328,8 +1314,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
|
||||
List<QueueMessageReference> result = null;
|
||||
List<QueueMessageReference> resultList = null;
|
||||
dispatchLock.lock();
|
||||
try{
|
||||
synchronized(dispatchMutex) {
|
||||
int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -1381,15 +1366,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
// Avoid return null list, if condition is not validated
|
||||
resultList = new ArrayList<QueueMessageReference>();
|
||||
}
|
||||
}finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
private void doDispatch(List<QueueMessageReference> list) throws Exception {
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
synchronized(dispatchMutex) {
|
||||
|
||||
synchronized (pagedInPendingDispatch) {
|
||||
if (!pagedInPendingDispatch.isEmpty()) {
|
||||
// Try to first dispatch anything that had not been
|
||||
|
@ -1412,9 +1395,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1545,8 +1526,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
QueueMessageReference message = null;
|
||||
MessageId messageId = messageDispatchNotification.getMessageId();
|
||||
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
synchronized(dispatchMutex) {
|
||||
synchronized (pagedInPendingDispatch) {
|
||||
for(QueueMessageReference ref : pagedInPendingDispatch) {
|
||||
if (messageId.equals(ref.getMessageId())) {
|
||||
|
@ -1590,9 +1570,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
dispatchLock.unlock();
|
||||
}
|
||||
}
|
||||
if (message == null) {
|
||||
throw new JMSException(
|
||||
"Slave broker out of sync with master - Message: "
|
||||
|
|
|
@ -0,0 +1,190 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
|
||||
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.perf.NumberOfDestinationsTest;
|
||||
import org.apache.activemq.store.kahadb.KahaDBStore;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.File;
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import junit.framework.TestCase;
|
||||
/*
|
||||
A AMQ2356Test
|
||||
We have an environment where we have a very large number of destinations.
|
||||
In an effort to reduce the number of threads I have set the options
|
||||
-Dorg.apache.activemq.UseDedicatedTaskRunner=false
|
||||
|
||||
and
|
||||
|
||||
<policyEntry queue=">" optimizedDispatch="true"/>
|
||||
|
||||
Unfortunately this very quickly leads to deadlocked queues.
|
||||
|
||||
My environment is:
|
||||
|
||||
ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system)
|
||||
TCP transportConnector
|
||||
|
||||
To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues.
|
||||
Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages.
|
||||
I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect.
|
||||
The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have
|
||||
deadlocked at less than 30 messages each.
|
||||
*/
|
||||
public class AMQ2356Test extends TestCase {
|
||||
protected static final int MESSAGE_COUNT = 1000;
|
||||
protected static final int NUMBER_OF_PAIRS = 10;
|
||||
private static final Log LOG = LogFactory.getLog(NumberOfDestinationsTest.class);
|
||||
protected BrokerService broker;
|
||||
protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
|
||||
protected int destinationCount;
|
||||
|
||||
public void testScenario() throws Exception {
|
||||
for (int i = 0; i < NUMBER_OF_PAIRS; i++) {
|
||||
ActiveMQQueue queue = new ActiveMQQueue(getClass().getName()+":"+i);
|
||||
ProducerConsumerPair cp = new ProducerConsumerPair();
|
||||
cp.start(this.brokerURL, queue, MESSAGE_COUNT);
|
||||
cp.testRun();
|
||||
cp.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected Destination getDestination(Session session) throws JMSException {
|
||||
String destinationName = getClass().getName() + "." + destinationCount++;
|
||||
return session.createQueue(destinationName);
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker();
|
||||
}
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
configureBroker(answer);
|
||||
answer.start();
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected void configureBroker(BrokerService answer) throws Exception {
|
||||
File dataFileDir = new File("target/test-amq-data/bugs/AMQ2356/kahadb");
|
||||
KahaDBStore kaha = new KahaDBStore();
|
||||
kaha.setDirectory(dataFileDir);
|
||||
answer.setUseJmx(false);
|
||||
// Setup a destination policy where it takes only 1 message at a time.
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setOptimizedDispatch(true);
|
||||
policyMap.setDefaultEntry(policy);
|
||||
answer.setDestinationPolicy(policyMap);
|
||||
|
||||
answer.setAdvisorySupport(false);
|
||||
answer.setEnableStatistics(false);
|
||||
answer.setDeleteAllMessagesOnStartup(true);
|
||||
answer.addConnector(brokerURL);
|
||||
|
||||
}
|
||||
static class ProducerConsumerPair {
|
||||
private Destination destination;
|
||||
private MessageProducer producer;
|
||||
private MessageConsumer consumer;
|
||||
private Connection producerConnection;
|
||||
private Connection consumerConnection;
|
||||
private int numberOfMessages;
|
||||
|
||||
ProducerConsumerPair(){
|
||||
|
||||
}
|
||||
void start(String brokerURL, final Destination dest, int msgNum) throws Exception {
|
||||
this.destination=dest;
|
||||
this.numberOfMessages=msgNum;
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL);
|
||||
this.producerConnection = cf.createConnection();
|
||||
this.producerConnection.start();
|
||||
this.consumerConnection = cf.createConnection();
|
||||
this.consumerConnection.start();
|
||||
this.producer=createProducer(this.producerConnection);
|
||||
this.consumer=createConsumer(this.consumerConnection);
|
||||
}
|
||||
|
||||
void testRun() throws Exception {
|
||||
|
||||
|
||||
Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
for (int i = 0 ; i < this.numberOfMessages; i++) {
|
||||
BytesMessage msg = s.createBytesMessage();
|
||||
msg.writeBytes(new byte[1024]);
|
||||
this.producer.send(msg);
|
||||
}
|
||||
int received = 0;
|
||||
for (int i = 0 ; i < this.numberOfMessages; i++) {
|
||||
Message msg = this.consumer.receive();
|
||||
assertNotNull(msg);
|
||||
received++;
|
||||
}
|
||||
assertEquals("Messages received on " + this.destination,this.numberOfMessages,received);
|
||||
|
||||
|
||||
}
|
||||
|
||||
void stop() throws Exception {
|
||||
if (this.producerConnection != null) {
|
||||
this.producerConnection.close();
|
||||
}
|
||||
if (this.consumerConnection != null) {
|
||||
this.consumerConnection.close();
|
||||
}
|
||||
}
|
||||
|
||||
private MessageProducer createProducer(Connection connection) throws Exception {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer result = session.createProducer(this.destination);
|
||||
return result;
|
||||
}
|
||||
|
||||
private MessageConsumer createConsumer(Connection connection) throws Exception {
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer result = session.createConsumer(this.destination);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -76,6 +76,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setOptimizedDispatch(true);
|
||||
defaultEntry.setExpireMessagesPeriod(100);
|
||||
defaultEntry.setMaxExpirePageSize(800);
|
||||
|
||||
|
|
Loading…
Reference in New Issue