reapplying patch from AMQ-1957 with small ammendment for transactional behaviour, reverting changes that fixed broken tests as tests now pass

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@712189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-11-07 16:38:51 +00:00
parent 92e143155b
commit 0bfb28a0bc
3 changed files with 16 additions and 16 deletions

View File

@ -200,13 +200,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
if (inAckRange) { if (inAckRange) {
// Don't remove the nodes until we are committed. // Don't remove the nodes until we are committed.
removeList.add(node);
if (!context.isInTransaction()) { if (!context.isInTransaction()) {
dequeueCounter++; dequeueCounter++;
if (!this.getConsumerInfo().isBrowser()) { if (!this.getConsumerInfo().isBrowser()) {
node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
} }
node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
removeList.add(node);
} else { } else {
// setup a Synchronization to remove nodes from the // setup a Synchronization to remove nodes from the
// dispatched list. // dispatched list.
@ -217,6 +217,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
throws Exception { throws Exception {
synchronized(dispatchLock) { synchronized(dispatchLock) {
dequeueCounter++; dequeueCounter++;
dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
prefetchExtension--; prefetchExtension--;
@ -224,9 +225,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
// Need to put it back in the front.
synchronized(dispatchLock) { synchronized(dispatchLock) {
dispatched.add(0, node);
// ActiveMQ workaround for AMQ-1730 - Please Ignore next line // ActiveMQ workaround for AMQ-1730 - Please Ignore next line
node.incrementRedeliveryCounter(); node.incrementRedeliveryCounter();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
@ -307,11 +306,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
} else if (ack.isRedeliveredAck()) { } else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be // Message was re-delivered but it was not yet considered to be
// a // a DLQ message.
// DLQ message.
// Acknowledge all dispatched messages up till the message id of // Acknowledge all dispatched messages up till the message id of
// the // the ack.
// acknowledgment.
boolean inAckRange = false; boolean inAckRange = false;
for (final MessageReference node : dispatched) { for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId(); MessageId messageId = node.getMessageId();
@ -396,9 +393,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
+ ack + ") was not in the dispatch list: " + ack + ") was not in the dispatch list: "
+ dispatched); + dispatched);
} else { } else {
LOG LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " + ack);
+ ack);
} }
} }
} }
@ -442,7 +438,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
throw new JMSException("Unmatched acknowledege: " + ack throw new JMSException("Unmatched acknowledege: " + ack
+ "; Could not find Message-ID " + lastAckedMsg + "; Could not find Message-ID " + lastAckedMsg
+ " in dispatched-list (end of ack)"); + " in dispatched-list (end of ack)");
if (ack.getMessageCount() != checkCount && ack.isStandardAck()) { if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
throw new JMSException("Unmatched acknowledege: " + ack throw new JMSException("Unmatched acknowledege: " + ack
+ "; Expected message count (" + ack.getMessageCount() + "; Expected message count (" + ack.getMessageCount()
+ ") differs from count in dispatched-list (" + checkCount + ") differs from count in dispatched-list (" + checkCount
@ -571,7 +567,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
break; break;
} }
// Synchronize between dispatched list and remove of messageg from pending list // Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action // related to remove subscription action
synchronized(dispatchLock) { synchronized(dispatchLock) {
pending.remove(); pending.remove();

View File

@ -639,10 +639,14 @@ public class JMSConsumerTest extends JmsTestSupport {
Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
// no commit so will auto rollback and get redispatched to redisptachConsumer // no commit so will auto rollback and get re-dispatched to redisptachConsumer
session.close(); session.close();
assertNotNull(redispatchConsumer.receive(1000)); Message msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
assertTrue(msg.getJMSRedelivered());
// should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much!
assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
redispatchSession.commit(); redispatchSession.commit();
assertNull(redispatchConsumer.receive(500)); assertNull(redispatchConsumer.receive(500));

View File

@ -35,8 +35,8 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
Connection connection = factory.createConnection(); Connection connection = factory.createConnection();
connection.setClientID("test"); connection.setClientID("test");
fail("Did not fail to connect as expected."); fail("Did not fail to connect as expected.");
} catch ( JMSException expected ) { } catch ( JMSException expected ) {
assertTrue("reason is java.net.UnknownHostException", expected.getCause() instanceof java.net.UnknownHostException); assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException);
} }
} }
} }