Switching addMessageLast to tryAddMessageLast when messages are added
to a Queue pending cursor to allow a potential deadlock to be
avoided. There is more work to be done here but this will at least
prevent a deadlock from occurring.

Fix and test based off of a patch created by Timothy Bish.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-11-20 20:45:38 +00:00
parent b5dd0a16f4
commit cc6213ebf2
9 changed files with 354 additions and 62 deletions

View File

@ -827,33 +827,38 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
ListenableFuture<Object> result = null;
producerExchange.incrementSend();
checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly();
try {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (store != null && message.isPersistent()) {
message.getMessageId().setFutureOrSequenceLong(null);
try {
if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
result.addListener(new PendingMarshalUsageTracker(message));
} else {
store.addMessage(context, message);
do {
checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly();
try {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (store != null && message.isPersistent()) {
message.getMessageId().setFutureOrSequenceLong(null);
try {
if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
result.addListener(new PendingMarshalUsageTracker(message));
} else {
store.addMessage(context, message);
}
if (isReduceMemoryFootprint()) {
message.clearMarshalledState();
}
} catch (Exception e) {
// we may have a store in inconsistent state, so reset the cursor
// before restarting normal broker operations
resetNeeded = true;
throw e;
}
if (isReduceMemoryFootprint()) {
message.clearMarshalledState();
}
} catch (Exception e) {
// we may have a store in inconsistent state, so reset the cursor
// before restarting normal broker operations
resetNeeded = true;
throw e;
}
if(tryOrderedCursorAdd(message, context)) {
break;
}
} finally {
sendLock.unlock();
}
orderedCursorAdd(message, context);
} finally {
sendLock.unlock();
}
} while (started.get());
if (store == null || (!context.isInTransaction() && !message.isPersistent())) {
messageSent(context, message);
}
@ -867,15 +872,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception {
private boolean tryOrderedCursorAdd(Message message, ConnectionContext context) throws Exception {
boolean result = true;
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null)));
} else if (store != null && message.isPersistent()) {
doPendingCursorAdditions();
} else {
// no ordering issue with non persistent messages
cursorAdd(message);
result = tryCursorAdd(message);
}
return result;
}
private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
@ -1813,7 +1822,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
final boolean cursorAdd(final Message msg) throws Exception {
private final boolean cursorAdd(final Message msg) throws Exception {
messagesLock.writeLock().lock();
try {
return messages.addMessageLast(msg);
@ -1822,6 +1831,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
private final boolean tryCursorAdd(final Message msg) throws Exception {
messagesLock.writeLock().lock();
try {
return messages.tryAddMessageLast(msg, 50);
} finally {
messagesLock.writeLock().unlock();
}
}
final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();

View File

@ -33,8 +33,8 @@ import org.apache.activemq.usage.SystemUsage;
/**
* Abstract method holder for pending message (messages awaiting disptach to a
* consumer) cursor
*
*
*
*
*/
public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 70;
@ -49,12 +49,13 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
private boolean started=false;
protected MessageReference last = null;
protected final boolean prioritizedMessages;
public AbstractPendingMessageCursor(boolean prioritizedMessages) {
this.prioritizedMessages=prioritizedMessages;
}
@Override
public synchronized void start() throws Exception {
if (!started && enableAudit && audit==null) {
audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
@ -62,71 +63,89 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
started=true;
}
@Override
public synchronized void stop() throws Exception {
started=false;
gc();
}
@Override
public void add(ConnectionContext context, Destination destination) throws Exception {
}
@Override
@SuppressWarnings("unchecked")
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
return Collections.EMPTY_LIST;
}
@Override
public boolean isRecoveryRequired() {
return true;
}
@Override
public void addMessageFirst(MessageReference node) throws Exception {
}
@Override
public boolean addMessageLast(MessageReference node) throws Exception {
return true;
}
public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
return addMessageLast(node);
return tryAddMessageLast(node, INFINITE_WAIT);
}
@Override
public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
return true;
}
@Override
public void addRecoveredMessage(MessageReference node) throws Exception {
addMessageLast(node);
}
@Override
public void clear() {
}
@Override
public boolean hasNext() {
return false;
}
@Override
public boolean isEmpty() {
return false;
}
@Override
public boolean isEmpty(Destination destination) {
return isEmpty();
}
@Override
public MessageReference next() {
return null;
}
@Override
public void remove() {
}
@Override
public void reset() {
}
@Override
public int size() {
return 0;
}
@Override
public int getMaxBatchSize() {
return maxBatchSize;
}
@Override
public void setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
@ -134,31 +153,39 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
protected void fillBatch() throws Exception {
}
@Override
public void resetForGC() {
reset();
}
@Override
public void remove(MessageReference node) {
}
@Override
public void gc() {
}
@Override
public void setSystemUsage(SystemUsage usageManager) {
this.systemUsage = usageManager;
}
@Override
public boolean hasSpace() {
return systemUsage != null ? (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
}
@Override
public boolean isFull() {
return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
}
@Override
public void release() {
}
@Override
public boolean hasMessagesBufferedToDeliver() {
return false;
}
@ -166,6 +193,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @return the memoryUsageHighWaterMark
*/
@Override
public int getMemoryUsageHighWaterMark() {
return memoryUsageHighWaterMark;
}
@ -173,6 +201,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/
@Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
}
@ -180,25 +209,28 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @return the usageManager
*/
@Override
public SystemUsage getSystemUsage() {
return this.systemUsage;
}
/**
* destroy the cursor
*
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
stop();
}
/**
* Page in a restricted number of messages
*
*
* @param maxItems maximum number of messages to return
* @return a list of paged in messages
*/
@Override
public LinkedList<MessageReference> pageInList(int maxItems) {
throw new RuntimeException("Not supported");
}
@ -206,6 +238,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @return the maxProducersToAudit
*/
@Override
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
@ -213,6 +246,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @param maxProducersToAudit the maxProducersToAudit to set
*/
@Override
public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
if (audit != null) {
@ -223,25 +257,28 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @return the maxAuditDepth
*/
@Override
public int getMaxAuditDepth() {
return maxAuditDepth;
}
/**
* @param maxAuditDepth the maxAuditDepth to set
*/
@Override
public synchronized void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
if (audit != null) {
audit.setAuditDepth(maxAuditDepth);
}
}
/**
* @return the enableAudit
*/
@Override
public boolean isEnableAudit() {
return enableAudit;
}
@ -249,38 +286,44 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
/**
* @param enableAudit the enableAudit to set
*/
@Override
public synchronized void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
if (enableAudit && started && audit==null) {
audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
}
}
@Override
public boolean isTransient() {
return false;
}
/**
* set the audit
* @param audit new audit component
*/
@Override
public void setMessageAudit(ActiveMQMessageAudit audit) {
this.audit=audit;
}
/**
* @return the audit
*/
@Override
public ActiveMQMessageAudit getMessageAudit() {
return audit;
}
@Override
public boolean isUseCache() {
return useCache;
}
@Override
public void setUseCache(boolean useCache) {
this.useCache = useCache;
}
@ -290,7 +333,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
rollback(messageId);
return !unique;
}
/**
* records a message id and checks if it is a duplicate
* @param messageId
@ -302,17 +345,18 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
}
return !audit.isDuplicate(messageId);
}
@Override
public synchronized void rollback(MessageId id) {
if (audit != null) {
audit.rollback(id);
}
}
public synchronized boolean isStarted() {
return started;
}
public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
boolean result = false;
Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
@ -328,6 +372,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
}
@Override
public synchronized boolean isCacheEnabled() {
return cacheEnabled;
}
@ -336,6 +381,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
cacheEnabled = val;
}
@Override
public void rebase() {
}
}

View File

@ -44,7 +44,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private Iterator<MessageReference> iterator = null;
protected boolean batchResetNeeded = false;
protected int size;
private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
private final LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
private static int SYNC_ADD = 0;
private static int ASYNC_ADD = 1;
final MessageId[] lastCachedIds = new MessageId[2];
@ -210,7 +210,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception {
boolean disableCache = false;
if (hasSpace()) {
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {

View File

@ -202,11 +202,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param node
* @throws Exception
*/
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
return tryAddMessageLast(node, 0);
}
@Override
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
if (!node.isExpired()) {

View File

@ -35,6 +35,8 @@ import org.apache.activemq.usage.SystemUsage;
*/
public interface PendingMessageCursor extends Service {
static final long INFINITE_WAIT = 0;
/**
* Add a destination
*

View File

@ -183,7 +183,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (isStarted()) {

View File

@ -90,14 +90,14 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) throws Exception {
boolean result = true;
if (node != null) {
Message msg = node.getMessage();
if (started) {
pendingCount++;
if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node);
result = nonPersistent.tryAddMessageLast(node, maxWait);
}
}
if (msg.isPersistent()) {

View File

@ -102,7 +102,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
*/
@Override
public synchronized boolean addMessageLast(MessageReference node) {
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) {
node.incrementReferenceCount();
list.addMessageLast(node);
return true;

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test behavior of senders when broker side producer flow control kicks in.
*/
public class AMQ5712Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ5712Test.class);
@Rule public TestName name = new TestName();
private BrokerService brokerService;
private Connection connection;
@Before
public void setUp() throws Exception {
brokerService = createBroker();
brokerService.start();
brokerService.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {}
}
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
private Connection createConnection() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
factory.setAlwaysSyncSend(true);
return factory.createConnection();
}
@Test(timeout = 120000)
public void test() throws Exception {
connection = createConnection();
connection.start();
final int MSG_COUNT = 100;
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
final QueueViewMBean queueView = getProxyToQueue(name.getMethodName());
byte[] payload = new byte[65535];
Arrays.fill(payload, (byte) 255);
final CountDownLatch done = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger();
Thread purge = new Thread(new Runnable() {
@Override
public void run() {
try {
while (!done.await(5, TimeUnit.SECONDS)) {
if (queueView.getBlockedSends() > 0 && queueView.getQueueSize() > 0) {
long queueSize = queueView.getQueueSize();
LOG.info("Queue send blocked at {} messages", queueSize);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < queueSize; i++) {
Message message = consumer.receive(60000);
if (message != null) {
counter.incrementAndGet();
message.acknowledge();
} else {
LOG.warn("Got null message when none as expected.");
}
}
consumer.close();
}
}
} catch (Exception ex) {
}
}
});
purge.start();
for (int i = 0; i < MSG_COUNT; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.send(message);
LOG.info("sent message: {}", i);
}
done.countDown();
purge.join(60000);
if (purge.isAlive()) {
fail("Consumer thread should have read initial batch and completed.");
}
//wait for processed acked messages
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return queueView.getDequeueCount() == counter.get();
}
}));
long remainingQueued = queueView.getQueueSize();
LOG.info("Remaining messages to consume: {}", remainingQueued);
assertEquals(remainingQueued, MSG_COUNT - counter.get());
MessageConsumer consumer = session.createConsumer(queue);
for (int i = counter.get(); i < MSG_COUNT; i++) {
Message message = consumer.receive(5000);
assertNotNull("Should not get null message", consumer);
counter.incrementAndGet();
message.acknowledge();
LOG.info("Read message: {}", i);
}
assertEquals("Should consume all messages", MSG_COUNT, counter.get());
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
KahaDBStore persistence = createStore(true);
persistence.setJournalMaxFileLength(1024 * 1024 * 1);
answer.setPersistent(true);
answer.setPersistenceAdapter(persistence);
answer.setDeleteAllMessagesOnStartup(true);
answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 6);
answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5);
answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 5);
answer.setUseJmx(true);
answer.getManagementContext().setCreateConnector(false);
answer.setSchedulerSupport(false);
answer.setAdvisorySupport(false);
PListStoreImpl tempStore = ((PListStoreImpl)answer.getSystemUsage().getTempUsage().getStore());
tempStore.setCleanupInterval(10000);
tempStore.setJournalMaxFileLength(1024 * 1024 * 2);
PolicyEntry policy = new PolicyEntry();
policy.setProducerFlowControl(false);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
answer.setDestinationPolicy(policyMap);
return answer;
}
private KahaDBStore createStore(boolean delete) throws IOException {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
if( delete ) {
kaha.deleteAllMessages();
}
return kaha;
}
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
}