ensure duplicate acks are send immediatly and supppress outstanding delivery acks on transport resumption, AMQ-2149|https://issues.apache.org/activemq/browse/AMQ-2149

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@762464 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-04-06 19:20:12 +00:00
parent 21cd3e69c6
commit 9f548bb74a
4 changed files with 25 additions and 34 deletions

View File

@ -1836,12 +1836,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
TransportListener listener = iter.next(); TransportListener listener = iter.next();
listener.transportResumed(); listener.transportResumed();
} }
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
// deliverAcks at this point is too early as acks can arrive at the broker
// before redispatch of messages and hence be out or order
s.transportResumed();
}
} }
/** /**

View File

@ -1054,6 +1054,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.connection.rollbackDuplicate(this, old.getMessage()); session.connection.rollbackDuplicate(this, old.getMessage());
} }
} }
if (pendingAck != null && pendingAck.isDeliveredAck()) {
// on resumption a pending delivered ack will be out of sync with
// re deliveries.
if (LOG.isDebugEnabled()) {
LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck);
}
pendingAck = null;
}
} }
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
@ -1085,12 +1093,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else { } else {
// ignore duplicate // ignore duplicate
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " ignoring duplicate: " + md.getMessage()); LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
} }
// in a transaction ack delivery of duplicates to ensure prefetch extension kicks in. // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
// the normal ack will happen in the transaction. // the normal ack will happen in the transaction.
ackLater(md, session.isTransacted() ? if (session.isTransacted()) {
MessageAck.DELIVERED_ACK_TYPE : MessageAck.STANDARD_ACK_TYPE); ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
acknowledge(md);
}
} }
} }
} }
@ -1159,13 +1170,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return lastDeliveredSequenceId; return lastDeliveredSequenceId;
} }
// on resumption re deliveries will percolate acks in their own good time
public void transportResumed() {
pendingAck = null;
additionalWindowSize = 0;
deliveredCounter = 0;
}
public IOException getFailureError() { public IOException getFailureError() {
return failureError; return failureError;
} }

View File

@ -1962,12 +1962,4 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
syncSendPacket(ack); syncSendPacket(ack);
} }
} }
public void transportResumed() {
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = iter.next();
consumer.transportResumed();
}
}
} }

View File

@ -39,8 +39,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.util.LoggingBrokerPlugin; import org.apache.activemq.broker.util.LoggingBrokerPlugin;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
@ -64,19 +62,20 @@ public class AMQ2149Test extends TestCase {
private final String SEQ_NUM_PROPERTY = "seqNum"; private final String SEQ_NUM_PROPERTY = "seqNum";
final int MESSAGE_LENGTH_BYTES = 75 * 1024; final int MESSAGE_LENGTH_BYTES = 75 * 1024;
final long SLEEP_BETWEEN_SEND_MS = 3; final long SLEEP_BETWEEN_SEND_MS = 25;
final int NUM_SENDERS_AND_RECEIVERS = 10; final int NUM_SENDERS_AND_RECEIVERS = 10;
final Object brokerLock = new Object(); final Object brokerLock = new Object();
private static final long DEFAULT_BROKER_STOP_PERIOD = 20 * 1000; private static final long DEFAULT_BROKER_STOP_PERIOD = 20 * 1000;
private static final long DEFAULT_NUM_TO_SEND = 1500; private static final long DEFAULT_NUM_TO_SEND = 1400;
long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
long numtoSend = DEFAULT_NUM_TO_SEND; long numtoSend = DEFAULT_NUM_TO_SEND;
long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
String brokerURL = DEFAULT_BROKER_URL; String brokerURL = DEFAULT_BROKER_URL;
int numBrokerRestarts = 0; int numBrokerRestarts = 0;
final static int MAX_BROKER_RESTARTS = 4; final static int MAX_BROKER_RESTARTS = 5;
BrokerService broker; BrokerService broker;
Vector<Throwable> exceptions = new Vector<Throwable>(); Vector<Throwable> exceptions = new Vector<Throwable>();
@ -110,6 +109,7 @@ public class AMQ2149Test extends TestCase {
dataDirFile = new File("target/"+ getName()); dataDirFile = new File("target/"+ getName());
numtoSend = DEFAULT_NUM_TO_SEND; numtoSend = DEFAULT_NUM_TO_SEND;
brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
brokerURL = DEFAULT_BROKER_URL; brokerURL = DEFAULT_BROKER_URL;
} }
@ -244,9 +244,9 @@ public class AMQ2149Test extends TestCase {
LOG.error(dest + " send error", e); LOG.error(dest + " send error", e);
exceptions.add(e); exceptions.add(e);
} }
if (SLEEP_BETWEEN_SEND_MS > 0) { if (sleepBetweenSend > 0) {
try { try {
Thread.sleep(SLEEP_BETWEEN_SEND_MS); Thread.sleep(sleepBetweenSend);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn(dest + " sleep interrupted", e); LOG.warn(dest + " sleep interrupted", e);
} }
@ -301,7 +301,7 @@ public class AMQ2149Test extends TestCase {
} }
public void x_testOrderWithRestart() throws Exception { public void testOrderWithRestart() throws Exception {
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages(); broker.deleteAllMessages();
@ -323,7 +323,7 @@ public class AMQ2149Test extends TestCase {
verifyStats(true); verifyStats(true);
} }
public void x_testTopicOrderWithRestart() throws Exception { public void testTopicOrderWithRestart() throws Exception {
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages(); broker.deleteAllMessages();
@ -351,7 +351,8 @@ public class AMQ2149Test extends TestCase {
} }
public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
numtoSend = 15000; numtoSend = 10000;
sleepBetweenSend = 3;
brokerStopPeriod = 30 * 1000; brokerStopPeriod = 30 * 1000;
createBroker(new Configurer() { createBroker(new Configurer() {