From 7ceb4cbc432afeea89928a6333f1f215a351eee4 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 18 Jan 2010 09:43:21 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2572 - issue with unconsumed prefetched messages being ignored as duplicates - timing issue with consumer close and session dispatch git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@900318 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/ActiveMQMessageConsumer.java | 4 ++-- .../java/org/apache/activemq/broker/region/Queue.java | 7 +++++-- .../java/org/apache/activemq/bugs/AMQ2102Test.java | 11 +++++++---- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 751da86362..d4d718d10f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -729,6 +729,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredMessages.clear(); } } + unconsumedMessages.close(); + this.session.removeConsumer(this); List list = unconsumedMessages.removeAll(); if (!this.info.isBrowser()) { for (MessageDispatch old : list) { @@ -736,8 +738,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC session.connection.rollbackDuplicate(this, old.getMessage()); } } - unconsumedMessages.close(); - this.session.removeConsumer(this); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 6403b30e06..2c479b3ca6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1160,7 +1160,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { LOG.error("Failed to page in more queue messages ", e); } } - return pendingWakeups.decrementAndGet() > 0; + if (pendingWakeups.get() > 0) { + pendingWakeups.decrementAndGet(); + } + return pendingWakeups.get() > 0; } } @@ -1333,7 +1336,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { int toPageIn = Math.min(getMaxPageSize(), messages.size()); if (LOG.isDebugEnabled()) { LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " - + pagedInMessages.size()); + + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount()); } if (isLazyDispatch() && !force) { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java index 839136421b..0a81ee87ae 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java @@ -63,7 +63,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep private boolean running; private org.omg.CORBA.IntHolder startup; private Thread thread; - private int numToProcessPerIteration; + private final int numToProcessPerIteration; Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) { this.connectionFactory = connectionFactory; @@ -206,12 +206,15 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep try { processMessage(session, producer, message); session.commit(); + numToProcess--; } catch (Throwable t) { error("message=" + message + " failure", t); session.rollback(); } + } else { + info("got null message on: " + numToProcess); } - } while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && isRunning()); + } while ((numToProcessPerIteration == CONSUME_ALL || numToProcess > 0) && isRunning()); } public void run() { @@ -360,7 +363,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep error("Failed to commit with count: " + messageCount.value, e); } } - messageCount.notify(); + messageCount.notifyAll(); } } else { error("Producer cannot process " + reply.getClass().getSimpleName()); @@ -441,7 +444,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep String masterUrl; public void setUp() throws Exception { - setMaxTestTime(10 * 60 * 1000); + setMaxTestTime(6 * 60 * 1000); setAutoFail(true); super.setUp(); master.setBrokerName("Master");