resolve https://issues.apache.org/activemq/browse/AMQ-2693 - loaded machine with slow thread creation can delay interruption processing past next dispatch which can be problematic. prefetch=1 will workaround

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@933240 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-04-12 14:03:18 +00:00
parent fc52e42636
commit 9e54516aba
2 changed files with 24 additions and 12 deletions

View File

@ -144,6 +144,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private ExecutorService executorService; private ExecutorService executorService;
private MessageTransformer transformer; private MessageTransformer transformer;
private boolean clearDispatchList; private boolean clearDispatchList;
boolean inProgressClearRequiredFlag;
private MessageAck pendingAck; private MessageAck pendingAck;
private long lastDeliveredSequenceId; private long lastDeliveredSequenceId;
@ -655,23 +656,32 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.session.asyncSendPacket(removeCommand); this.session.asyncSendPacket(removeCommand);
} }
void clearMessagesInProgress() { void inProgressClearRequired() {
inProgressClearRequiredFlag = true;
// deal with delivered messages async to avoid lock contention with in progress acks // deal with delivered messages async to avoid lock contention with in progress acks
clearDispatchList = true; clearDispatchList = true;
synchronized (unconsumedMessages.getMutex()) { }
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt"); void clearMessagesInProgress() {
} if (inProgressClearRequiredFlag) {
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer synchronized (unconsumedMessages.getMutex()) {
List<MessageDispatch> list = unconsumedMessages.removeAll(); if (inProgressClearRequiredFlag) {
if (!this.info.isBrowser()) { if (LOG.isDebugEnabled()) {
for (MessageDispatch old : list) { LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
session.connection.rollbackDuplicate(this, old.getMessage()); }
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
// allow dispatch on this connection to resume
session.connection.transportInterruptionProcessingComplete();
inProgressClearRequiredFlag = false;
} }
} }
} }
// allow dispatch on this connection to resume
session.connection.transportInterruptionProcessingComplete();
} }
void deliverAcks() { void deliverAcks() {
@ -1192,6 +1202,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void dispatch(MessageDispatch md) { public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get(); MessageListener listener = this.messageListener.get();
try { try {
clearMessagesInProgress();
clearDispatchList(); clearDispatchList();
synchronized (unconsumedMessages.getMutex()) { synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {

View File

@ -649,6 +649,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
// connection.transportInterruptionProcessingComplete() // connection.transportInterruptionProcessingComplete()
// //
for (final ActiveMQMessageConsumer consumer : consumers) { for (final ActiveMQMessageConsumer consumer : consumers) {
consumer.inProgressClearRequired();
scheduler.executeAfterDelay(new Runnable() { scheduler.executeAfterDelay(new Runnable() {
public void run() { public void run() {
consumer.clearMessagesInProgress(); consumer.clearMessagesInProgress();