merge -c 900318 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2572 - issue with unconsumed prefetched messages being ignored as duplicates - timing issue with consumer close and session dispatch

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@900327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-18 10:15:28 +00:00
parent 94f6520eb5
commit fb871adbbd
3 changed files with 14 additions and 7 deletions

View File

@ -729,6 +729,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredMessages.clear(); deliveredMessages.clear();
} }
} }
unconsumedMessages.close();
this.session.removeConsumer(this);
List<MessageDispatch> list = unconsumedMessages.removeAll(); List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) { if (!this.info.isBrowser()) {
for (MessageDispatch old : list) { for (MessageDispatch old : list) {
@ -736,8 +738,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.connection.rollbackDuplicate(this, old.getMessage()); session.connection.rollbackDuplicate(this, old.getMessage());
} }
} }
unconsumedMessages.close();
this.session.removeConsumer(this);
} }
} }

View File

@ -1160,7 +1160,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.error("Failed to page in more queue messages ", e); LOG.error("Failed to page in more queue messages ", e);
} }
} }
return pendingWakeups.decrementAndGet() > 0; if (pendingWakeups.get() > 0) {
pendingWakeups.decrementAndGet();
}
return pendingWakeups.get() > 0;
} }
} }
@ -1333,7 +1336,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
int toPageIn = Math.min(getMaxPageSize(), messages.size()); int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ pagedInMessages.size()); + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
} }
if (isLazyDispatch() && !force) { if (isLazyDispatch() && !force) {

View File

@ -63,7 +63,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
private boolean running; private boolean running;
private org.omg.CORBA.IntHolder startup; private org.omg.CORBA.IntHolder startup;
private Thread thread; private Thread thread;
private int numToProcessPerIteration; private final int numToProcessPerIteration;
Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) { Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) {
this.connectionFactory = connectionFactory; this.connectionFactory = connectionFactory;
@ -206,12 +206,15 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
try { try {
processMessage(session, producer, message); processMessage(session, producer, message);
session.commit(); session.commit();
numToProcess--;
} catch (Throwable t) { } catch (Throwable t) {
error("message=" + message + " failure", t); error("message=" + message + " failure", t);
session.rollback(); session.rollback();
} }
} else {
info("got null message on: " + numToProcess);
} }
} while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && isRunning()); } while ((numToProcessPerIteration == CONSUME_ALL || numToProcess > 0) && isRunning());
} }
public void run() { public void run() {
@ -360,7 +363,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
error("Failed to commit with count: " + messageCount.value, e); error("Failed to commit with count: " + messageCount.value, e);
} }
} }
messageCount.notify(); messageCount.notifyAll();
} }
} else { } else {
error("Producer cannot process " + reply.getClass().getSimpleName()); error("Producer cannot process " + reply.getClass().getSimpleName());
@ -441,6 +444,7 @@ public class AMQ2102Test extends CombinationTestSupport implements UncaughtExcep
String masterUrl; String masterUrl;
public void setUp() throws Exception { public void setUp() throws Exception {
setMaxTestTime(6 * 60 * 1000);
setAutoFail(true); setAutoFail(true);
master.setBrokerName("Master"); master.setBrokerName("Master");
master.addConnector("tcp://localhost:0"); master.addConnector("tcp://localhost:0");