git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@698595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-09-24 14:46:00 +00:00
parent 63b2c406a8
commit efdf8ce25a
3 changed files with 30 additions and 28 deletions

View File

@ -125,6 +125,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private MessageTransformer transformer;
private boolean clearDispatchList;
private MessageAck pendingAck;
/**
* Create a MessageConsumer
*
@ -615,6 +617,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackCounter = 0;
}
}
} else {
ack = pendingAck;
}
if (ack != null) {
final MessageAck ackToSend = ack;
@ -835,13 +839,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// The delivered message list is only needed for the recover method
// which is only used with client ack.
deliveredCounter++;
MessageAck oldPendingAck = pendingAck;
pendingAck = new MessageAck(md, ackType, deliveredCounter);
if( oldPendingAck==null ) {
pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
} else {
pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
}
pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
if( !deliveredMessages.isEmpty() ) {
ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
}
ack.setTransactionId(session.getTransactionContext().getTransactionId());
session.sendAck(ack);
session.sendAck(pendingAck);
pendingAck=null;
additionalWindowSize = deliveredCounter;
// When using DUPS ok, we do a real ack.

View File

@ -216,18 +216,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
throws Exception {
synchronized(dispatchLock) {
dequeueCounter++;
node
.getRegionDestination()
.getDestinationStatistics()
.getDequeues()
.increment();
node
.getRegionDestination()
.getDestinationStatistics()
.getInflight()
.decrement();
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
prefetchExtension--;
}
}
@ -236,6 +226,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Need to put it back in the front.
synchronized(dispatchLock) {
dispatched.add(0, node);
// ActiveMQ workaround for AMQ-1730 - Please Ignore next line
node.incrementRedeliveryCounter();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
}
});

View File

@ -509,22 +509,21 @@ public class JMSConsumerTest extends JmsTestSupport {
sendMessages(session, destination, 2);
session.commit();
// Only pick up the first message.
Message message1 = consumer.receive(1000);
assertNotNull(message1);
// Don't acknowledge yet. This should keep our prefetch full.
// The prefetch should fill up with 1 message.
// Since prefetch is still full, the 2nd message should get dispatched
// to
// another consumer.. lets create the 2nd consumer test that it does
// to another consumer.. lets create the 2nd consumer test that it does
// make sure it does.
ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
connections.add(connection2);
Session session2 = connection2.createSession(true, 0);
session2.createConsumer(destination);
MessageConsumer consumer2 = session2.createConsumer(destination);
// Only pick up the 2nd messages.
Message message2 = consumer.receive(1000);
// Pick up the first message.
Message message1 = consumer.receive(1000);
assertNotNull(message1);
// Pick up the 2nd messages.
Message message2 = consumer2.receive(1000);
assertNotNull(message2);
session.commit();