mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4665 - fix auto ack on duplicate, now use poison ack. For client ack allow replay after failover. Additional tests to validate new behaviour
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1511307 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0061f6f755
commit
82c4ab83ed
|
@ -121,7 +121,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// The are the messages that were delivered to the consumer but that have
|
// The are the messages that were delivered to the consumer but that have
|
||||||
// not been acknowledged. It's kept in reverse order since we
|
// not been acknowledged. It's kept in reverse order since we
|
||||||
// Always walk list in reverse order.
|
// Always walk list in reverse order.
|
||||||
private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
|
protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
|
||||||
// track duplicate deliveries in a transaction such that the tx integrity can be validated
|
// track duplicate deliveries in a transaction such that the tx integrity can be validated
|
||||||
private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
|
private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
|
||||||
private int deliveredCounter;
|
private int deliveredCounter;
|
||||||
|
@ -143,7 +143,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
|
private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
private MessageTransformer transformer;
|
private MessageTransformer transformer;
|
||||||
private boolean clearDispatchList;
|
private boolean clearDeliveredList;
|
||||||
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
|
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
|
||||||
|
|
||||||
private MessageAck pendingAck;
|
private MessageAck pendingAck;
|
||||||
|
@ -704,7 +704,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
void inProgressClearRequired() {
|
void inProgressClearRequired() {
|
||||||
inProgressClearRequiredFlag.incrementAndGet();
|
inProgressClearRequiredFlag.incrementAndGet();
|
||||||
// deal with delivered messages async to avoid lock contention with in progress acks
|
// deal with delivered messages async to avoid lock contention with in progress acks
|
||||||
clearDispatchList = true;
|
clearDeliveredList = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearMessagesInProgress() {
|
void clearMessagesInProgress() {
|
||||||
|
@ -730,6 +730,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
clearDeliveredList();
|
||||||
}
|
}
|
||||||
|
|
||||||
void deliverAcks() {
|
void deliverAcks() {
|
||||||
|
@ -818,6 +819,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
if (!this.info.isBrowser()) {
|
if (!this.info.isBrowser()) {
|
||||||
for (MessageDispatch old : list) {
|
for (MessageDispatch old : list) {
|
||||||
// ensure we don't filter this as a duplicate
|
// ensure we don't filter this as a duplicate
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("on close, rollback: " + old.getMessage().getMessageId());
|
||||||
|
}
|
||||||
session.connection.rollbackDuplicate(this, old.getMessage());
|
session.connection.rollbackDuplicate(this, old.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -838,7 +842,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* broker to pull a message we are about to receive
|
* broker to pull a message we are about to receive
|
||||||
*/
|
*/
|
||||||
protected void sendPullCommand(long timeout) throws JMSException {
|
protected void sendPullCommand(long timeout) throws JMSException {
|
||||||
clearDispatchList();
|
clearDeliveredList();
|
||||||
if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
|
if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
|
||||||
MessagePull messagePull = new MessagePull();
|
MessagePull messagePull = new MessagePull();
|
||||||
messagePull.configure(info);
|
messagePull.configure(info);
|
||||||
|
@ -1010,6 +1014,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// AMQ-3956 evaluate both expired and normal msgs as
|
// AMQ-3956 evaluate both expired and normal msgs as
|
||||||
// otherwise consumer may get stalled
|
// otherwise consumer may get stalled
|
||||||
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
|
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("ackLater: sending: " + pendingAck);
|
||||||
|
}
|
||||||
session.sendAck(pendingAck);
|
session.sendAck(pendingAck);
|
||||||
pendingAck=null;
|
pendingAck=null;
|
||||||
deliveredCounter = 0;
|
deliveredCounter = 0;
|
||||||
|
@ -1025,7 +1032,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
@Override
|
@Override
|
||||||
public void beforeEnd() throws Exception {
|
public void beforeEnd() throws Exception {
|
||||||
if (transactedIndividualAck) {
|
if (transactedIndividualAck) {
|
||||||
clearDispatchList();
|
clearDeliveredList();
|
||||||
waitForRedeliveries();
|
waitForRedeliveries();
|
||||||
synchronized(deliveredMessages) {
|
synchronized(deliveredMessages) {
|
||||||
rollbackOnFailedRecoveryRedelivery();
|
rollbackOnFailedRecoveryRedelivery();
|
||||||
|
@ -1058,7 +1065,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
public void acknowledge() throws JMSException {
|
public void acknowledge() throws JMSException {
|
||||||
clearDispatchList();
|
clearDeliveredList();
|
||||||
waitForRedeliveries();
|
waitForRedeliveries();
|
||||||
synchronized(deliveredMessages) {
|
synchronized(deliveredMessages) {
|
||||||
// Acknowledge all messages so far.
|
// Acknowledge all messages so far.
|
||||||
|
@ -1162,7 +1169,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rollback() throws JMSException {
|
public void rollback() throws JMSException {
|
||||||
clearDispatchList();
|
clearDeliveredList();
|
||||||
synchronized (unconsumedMessages.getMutex()) {
|
synchronized (unconsumedMessages.getMutex()) {
|
||||||
if (optimizeAcknowledge) {
|
if (optimizeAcknowledge) {
|
||||||
// remove messages read but not acked at the broker yet through
|
// remove messages read but not acked at the broker yet through
|
||||||
|
@ -1301,6 +1308,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
if (previouslyDeliveredMessages != null) {
|
if (previouslyDeliveredMessages != null) {
|
||||||
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
|
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
|
||||||
if (!entry.getValue()) {
|
if (!entry.getValue()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("rollback non redelivered: " + entry.getKey());
|
||||||
|
}
|
||||||
removeFromDeliveredMessages(entry.getKey());
|
removeFromDeliveredMessages(entry.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1338,7 +1348,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
MessageListener listener = this.messageListener.get();
|
MessageListener listener = this.messageListener.get();
|
||||||
try {
|
try {
|
||||||
clearMessagesInProgress();
|
clearMessagesInProgress();
|
||||||
clearDispatchList();
|
clearDeliveredList();
|
||||||
synchronized (unconsumedMessages.getMutex()) {
|
synchronized (unconsumedMessages.getMutex()) {
|
||||||
if (!unconsumedMessages.isClosed()) {
|
if (!unconsumedMessages.isClosed()) {
|
||||||
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
|
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
|
||||||
|
@ -1375,10 +1385,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!session.isTransacted()) {
|
if (!session.isTransacted()) {
|
||||||
LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
|
LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md);
|
||||||
+ " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
|
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
|
||||||
MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
|
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
session.sendAck(ack);
|
poisonAck.setPoisonCause(new Throwable("Duplicate non transacted delivery to " + getConsumerId()));
|
||||||
|
session.sendAck(poisonAck);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
|
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
|
||||||
|
@ -1423,22 +1434,35 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
|
|
||||||
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
|
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
|
||||||
private void clearDispatchList() {
|
private void clearDeliveredList() {
|
||||||
if (clearDispatchList) {
|
if (clearDeliveredList) {
|
||||||
synchronized (deliveredMessages) {
|
synchronized (deliveredMessages) {
|
||||||
if (clearDispatchList) {
|
if (clearDeliveredList) {
|
||||||
if (!deliveredMessages.isEmpty()) {
|
if (!deliveredMessages.isEmpty()) {
|
||||||
if (session.isTransacted()) {
|
if (session.isTransacted()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
|
|
||||||
}
|
|
||||||
if (previouslyDeliveredMessages == null) {
|
if (previouslyDeliveredMessages == null) {
|
||||||
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
|
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
|
||||||
}
|
}
|
||||||
for (MessageDispatch delivered : deliveredMessages) {
|
for (MessageDispatch delivered : deliveredMessages) {
|
||||||
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
|
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
|
||||||
}
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(getConsumerId() + " tracking existing transacted " + previouslyDeliveredMessages.transactionId +
|
||||||
|
" delivered list (" + deliveredMessages.size() + ") on transport interrupt");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (session.isClientAcknowledge()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(getConsumerId() + " rolling back delivered list (" + deliveredMessages.size() + ") on transport interrupt");
|
||||||
|
}
|
||||||
|
// allow redelivery
|
||||||
|
if (!this.info.isBrowser()) {
|
||||||
|
for (MessageDispatch md: deliveredMessages) {
|
||||||
|
this.session.connection.rollbackDuplicate(this, md.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
|
LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
|
||||||
}
|
}
|
||||||
|
@ -1446,7 +1470,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
pendingAck = null;
|
pendingAck = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clearDispatchList = false;
|
clearDeliveredList = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,9 +41,6 @@ import org.apache.activemq.broker.region.DestinationStatistics;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.broker.util.LoggingBrokerPlugin;
|
import org.apache.activemq.broker.util.LoggingBrokerPlugin;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.leveldb.LevelDBStore;
|
|
||||||
import org.apache.activemq.usage.MemoryUsage;
|
|
||||||
import org.apache.activemq.usage.SystemUsage;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -75,7 +72,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
|
||||||
String brokerURL = DEFAULT_BROKER_URL;
|
String brokerURL = DEFAULT_BROKER_URL;
|
||||||
|
|
||||||
int numBrokerRestarts = 0;
|
int numBrokerRestarts = 0;
|
||||||
final static int MAX_BROKER_RESTARTS = 3;
|
final static int MAX_BROKER_RESTARTS = 4;
|
||||||
BrokerService broker;
|
BrokerService broker;
|
||||||
Vector<Throwable> exceptions = new Vector<Throwable>();
|
Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||||
|
|
||||||
|
@ -171,6 +168,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
final int TRANSACITON_BATCH = 500;
|
final int TRANSACITON_BATCH = 500;
|
||||||
|
boolean resumeOnNextOrPreviousIsOk = false;
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
try {
|
try {
|
||||||
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
|
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
|
||||||
|
@ -182,6 +180,16 @@ public class AMQ2149Test extends AutoFailTestSupport {
|
||||||
session.commit();
|
session.commit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (resumeOnNextOrPreviousIsOk) {
|
||||||
|
// after an indoubt commit we need to accept what we get (within reason)
|
||||||
|
if (seqNum != nextExpectedSeqNum) {
|
||||||
|
if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) {
|
||||||
|
nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
|
||||||
|
LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resumeOnNextOrPreviousIsOk = false;
|
||||||
|
}
|
||||||
if (seqNum != nextExpectedSeqNum) {
|
if (seqNum != nextExpectedSeqNum) {
|
||||||
LOG.warn(dest + " received " + seqNum
|
LOG.warn(dest + " received " + seqNum
|
||||||
+ " in msg: " + message.getJMSMessageID()
|
+ " in msg: " + message.getJMSMessageID()
|
||||||
|
@ -196,8 +204,16 @@ public class AMQ2149Test extends AutoFailTestSupport {
|
||||||
lastId = message.getJMSMessageID();
|
lastId = message.getJMSMessageID();
|
||||||
} catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
|
} catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
|
||||||
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
|
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
|
||||||
// batch will be replayed
|
if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) {
|
||||||
nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
|
// in doubt - either commit command or reply missing
|
||||||
|
// don't know if we will get a replay
|
||||||
|
resumeOnNextOrPreviousIsOk = true;
|
||||||
|
} else {
|
||||||
|
resumeOnNextOrPreviousIsOk = false;
|
||||||
|
// batch will be replayed
|
||||||
|
nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
|
||||||
|
}
|
||||||
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.error(dest + " onMessage error", e);
|
LOG.error(dest + " onMessage error", e);
|
||||||
exceptions.add(e);
|
exceptions.add(e);
|
||||||
|
@ -327,7 +343,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
|
||||||
public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
|
public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
|
||||||
numtoSend = 10000;
|
numtoSend = 10000;
|
||||||
sleepBetweenSend = 3;
|
sleepBetweenSend = 3;
|
||||||
brokerStopPeriod = 30 * 1000;
|
brokerStopPeriod = 10 * 1000;
|
||||||
|
|
||||||
createBroker(new Configurer() {
|
createBroker(new Configurer() {
|
||||||
public void configure(BrokerService broker) throws Exception {
|
public void configure(BrokerService broker) throws Exception {
|
||||||
|
|
|
@ -289,11 +289,14 @@ public class FailoverConsumerOutstandingCommitTest {
|
||||||
assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
|
assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
|
||||||
assertTrue("commit failed", gotCommitException.get());
|
assertTrue("commit failed", gotCommitException.get());
|
||||||
assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS));
|
assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS));
|
||||||
assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(0).getText());
|
int receivedIndex = 0;
|
||||||
// it was redelivered
|
assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(receivedIndex++).getText());
|
||||||
assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText());
|
if (!doActualBrokerCommit) {
|
||||||
|
// it will be redelivered and not tracked as a duplicate
|
||||||
|
assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(receivedIndex++).getText());
|
||||||
|
}
|
||||||
assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
|
assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
|
||||||
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
|
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(receivedIndex++).getText());
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -95,6 +97,141 @@ public class FailoverConsumerUnconsumedTest {
|
||||||
doTestFailoverConsumerDups(false);
|
doTestFailoverConsumerDups(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testFailoverClientAckMissingRedelivery() throws Exception {
|
||||||
|
|
||||||
|
final int maxConsumers = 2;
|
||||||
|
broker = createBroker(true);
|
||||||
|
|
||||||
|
broker.setPlugins(new BrokerPlugin[] {
|
||||||
|
new BrokerPluginSupport() {
|
||||||
|
int consumerCount;
|
||||||
|
|
||||||
|
// broker is killed on x create consumer
|
||||||
|
@Override
|
||||||
|
public Subscription addConsumer(ConnectionContext context,
|
||||||
|
final ConsumerInfo info) throws Exception {
|
||||||
|
if (++consumerCount == maxConsumers) {
|
||||||
|
context.setDontSendReponse(true);
|
||||||
|
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
LOG.info("Stopping broker on consumer: " + info.getConsumerId());
|
||||||
|
try {
|
||||||
|
broker.stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return super.addConsumer(context, info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
broker.start();
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
|
||||||
|
cf.setWatchTopicAdvisories(false);
|
||||||
|
|
||||||
|
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch);
|
||||||
|
|
||||||
|
final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>();
|
||||||
|
TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection);
|
||||||
|
testConsumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
LOG.info("onMessage:" + message.getJMSMessageID());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
testConsumers.add(testConsumer);
|
||||||
|
|
||||||
|
|
||||||
|
produceMessage(consumerSession, destination, maxConsumers * prefetch);
|
||||||
|
|
||||||
|
assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
int totalDelivered = 0;
|
||||||
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
long delivered = testConsumer.deliveredSize();
|
||||||
|
LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
|
||||||
|
totalDelivered += delivered;
|
||||||
|
}
|
||||||
|
return totalDelivered == maxConsumers * prefetch;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
LOG.info("add last consumer...");
|
||||||
|
TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection);
|
||||||
|
testConsumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
LOG.info("onMessage:" + message.getJMSMessageID());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
testConsumers.add(testConsumer);
|
||||||
|
shutdownConsumerAdded.countDown();
|
||||||
|
LOG.info("done add last consumer");
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// will be stopped by the plugin
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
|
||||||
|
broker = createBroker(false, this.url);
|
||||||
|
broker.start();
|
||||||
|
|
||||||
|
assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// each should again get prefetch messages - all unacked deliveries should be rolledback
|
||||||
|
assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
int totalDelivered = 0;
|
||||||
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
long delivered = testConsumer.deliveredSize();
|
||||||
|
LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
|
||||||
|
totalDelivered += delivered;
|
||||||
|
}
|
||||||
|
return totalDelivered == maxConsumers * prefetch;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
assertTrue("after restart each got prefetch amount", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
long delivered = testConsumer.deliveredSize();
|
||||||
|
LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
|
||||||
|
if (delivered != prefetch) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
|
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
|
||||||
|
|
||||||
|
@ -156,14 +293,14 @@ public class FailoverConsumerUnconsumedTest {
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
|
||||||
|
|
||||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
LOG.info("add last consumer...");
|
LOG.info("add last consumer...");
|
||||||
testConsumers.add(new TestConsumer(consumerSession, destination, connection));
|
testConsumers.add(new TestConsumer(consumerSession, destination, connection));
|
||||||
commitDoneLatch.countDown();
|
shutdownConsumerAdded.countDown();
|
||||||
LOG.info("done add last consumer");
|
LOG.info("done add last consumer");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
@ -190,7 +327,7 @@ public class FailoverConsumerUnconsumedTest {
|
||||||
broker = createBroker(false, this.url);
|
broker = createBroker(false, this.url);
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// each should again get prefetch messages - all unconsumed deliveries should be rolledback
|
// each should again get prefetch messages - all unconsumed deliveries should be rolledback
|
||||||
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@ -231,6 +368,10 @@ public class FailoverConsumerUnconsumedTest {
|
||||||
public int unconsumedSize() {
|
public int unconsumedSize() {
|
||||||
return unconsumedMessages.size();
|
return unconsumedMessages.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int deliveredSize() {
|
||||||
|
return deliveredMessages.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static long idGen = 100;
|
static long idGen = 100;
|
||||||
|
|
|
@ -203,7 +203,7 @@ public class FailoverDuplicateTest extends TestSupport {
|
||||||
receiveConnection.close();
|
receiveConnection.close();
|
||||||
|
|
||||||
// verify stats
|
// verify stats
|
||||||
assertEquals("expect all messages are dequeued with one duplicate", totalSent +1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
|
assertEquals("expect all messages are dequeued with one duplicate to dlq", totalSent + 2, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
|
||||||
|
|
||||||
Wait.waitFor(new Wait.Condition() {
|
Wait.waitFor(new Wait.Condition() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -212,7 +212,7 @@ public class FailoverDuplicateTest extends TestSupport {
|
||||||
return totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount();
|
return totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
assertEquals("dequeue correct, including duplicate dispatch auto acked", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
|
assertEquals("dequeue correct, including duplicate dispatch poisoned", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
|
||||||
|
|
||||||
// ensure no dangling messages with fresh broker etc
|
// ensure no dangling messages with fresh broker etc
|
||||||
broker.stop();
|
broker.stop();
|
||||||
|
|
Loading…
Reference in New Issue