From 1d1c9262fdd224dcca744b729bebe8cf4e13c3ef Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 25 Aug 2016 15:01:19 +0100 Subject: [PATCH] AMQ-6406 - ensure duplicates trapped by the cursor-add or queue-page-in are removed from the message store (cherry picked from commit 2b1cda196471280c4fc587d8664d6373e18c97ca) --- .../apache/activemq/broker/region/Queue.java | 16 +- .../region/cursors/AbstractStoreCursor.java | 18 +- .../failover/FailoverTransactionTest.java | 44 ++-- .../TwoBrokerQueueClientsReconnectTest.java | 206 +++++++++++++++++- 4 files changed, 247 insertions(+), 37 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 318f558db4..a74fe3b894 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -96,6 +96,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore; + /** * The Queue is a List of MessageEntry objects that are dispatched to matching * subscriptions. @@ -1970,14 +1972,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index resultList.addMessageLast(ref); } else { ref.decrementReferenceCount(); - // store should have trapped duplicate in it's index, also cursor audit - // we need to remove the duplicate from the store in the knowledge that the original message may be inflight + // store should have trapped duplicate in it's index, or cursor audit trapped insert + // or producerBrokerExchange suppressed send. // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id - LOG.warn("{}, duplicate message {} paged in, is cursor audit disabled? Removing from store and redirecting to dlq", this, ref.getMessage()); + LOG.warn("{}, duplicate message {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessage()); if (store != null) { ConnectionContext connectionContext = createConnectionContext(); - store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1)); - broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from store for " + destination)); + dropMessage(ref); + if (gotToTheStore(ref.getMessage())) { + LOG.debug("Duplicate message {} from cursor, removing from store", this, ref.getMessage()); + store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1)); + } + broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination)); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 06bae97572..4d7ffead3e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -121,14 +121,28 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } else { LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); - if (message.getMessageId().getEntryLocator() instanceof Long) { - // JDBC will store a duplicate (with new sequence id) - it needs an ack (AMQ4952Test) + if (gotToTheStore(message)) { duplicate(message); } } return recovered; } + public static boolean gotToTheStore(Message message) throws Exception { + if (message.isRecievedByDFBridge()) { + // concurrent store and dispatch - wait to see if the message gets to the store to see + // if the index suppressed it (original still present), or whether it was stored and needs to be removed + Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); + if (possibleFuture instanceof Future) { + ((Future) possibleFuture).get(); + } + // need to access again after wait on future + Object sequence = message.getMessageId().getFutureOrSequenceLong(); + return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0); + } + return true; + } + // track for processing outside of store index lock so we can dlq final LinkedList duplicatesFromStore = new LinkedList(); private void duplicate(Message message) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 717425f4d4..a7f8cbb682 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -809,15 +809,10 @@ public class FailoverTransactionTest extends TestSupport { LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); try { consumerSession1.commit(); - } catch (JMSException expectedSometimes) { - LOG.info("got exception ex on commit", expectedSometimes); - if (expectedSometimes instanceof TransactionRolledBackException) { - gotTransactionRolledBackException.set(true); - // ok, message one was not replayed so we expect the rollback - } else { - throw expectedSometimes; - } - + } catch (TransactionRolledBackException expected) { + LOG.info("got exception ex on commit", expected); + gotTransactionRolledBackException.set(true); + // ok, message one was not replayed so we expect the rollback } commitDoneLatch.countDown(); LOG.info("done async commit"); @@ -837,24 +832,17 @@ public class FailoverTransactionTest extends TestSupport { LOG.info("received message count: " + receivedMessages.size()); - // new transaction - Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000); - LOG.info("post: from consumer1 received: " + msg); - if (gotTransactionRolledBackException.get()) { - assertNotNull("should be available again after commit rollback ex", msg); - } else { - assertNull("should be nothing left for consumer as receive should have committed", msg); - } - consumerSession1.commit(); - - if (gotTransactionRolledBackException.get() || - !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) { - // just one message successfully consumed or none consumed - // consumer2 should get other message - msg = consumer2.receive(10000); - LOG.info("post: from consumer2 received: " + msg); - assertNotNull("got second message on consumer2", msg); - consumerSession2.commit(); + // new transaction to get both messages from either consumer + for (int i=0; i<2; i++) { + Message msg = consumer1.receive(5000); + LOG.info("post: from consumer1 received: " + msg); + consumerSession1.commit(); + if (msg == null) { + msg = consumer2.receive(10000); + LOG.info("post: from consumer2 received: " + msg); + consumerSession2.commit(); + } + assertNotNull("got message [" + i + "]", msg); } for (Connection c : connections) { @@ -877,7 +865,7 @@ public class FailoverTransactionTest extends TestSupport { connection.start(); Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer sweeper = sweeperSession.createConsumer(destination); - msg = sweeper.receive(1000); + Message msg = sweeper.receive(1000); LOG.info("Sweep received: " + msg); assertNull("no messges left dangling but got: " + msg, msg); connection.close(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java index 9adc2a3884..1498c06557 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java @@ -444,6 +444,83 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu })); } + + @SuppressWarnings("unchecked") + public void testDuplicateSendWithCursorAudit() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + brokers.get(broker2).broker.getDestinationPolicy().getDefaultEntry().setEnableAudit(true); + + bridgeBrokers(broker1, broker2); + + final AtomicBoolean first = new AtomicBoolean(); + final CountDownLatch gotMessageLatch = new CountDownLatch(1); + + BrokerService brokerService = brokers.get(broker2).broker; + brokerService.setPersistent(true); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public void send(final ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) + throws Exception { + super.send(producerExchange, messageSend); + if (first.compareAndSet(false, true)) { + producerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Waiting for recepit"); + assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS)); + LOG.info("Stopping connection post send and receive and multiple producers"); + producerExchange.getConnectionContext().getConnection().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + + // Run brokers + startAllBrokers(); + + waitForBridgeFormation(); + + // Create queue + Destination dest = createDestination("TEST.FOO", false); + + MessageConsumer client2 = createConsumer(broker2, dest); + + sendMessages("BrokerA", dest, 1); + + assertEquals("Client got message", 1, receiveExactMessages(client2, 1)); + client2.close(); + gotMessageLatch.countDown(); + + // message still pending on broker1 + assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount()); + + client2 = createConsumer(broker2, dest); + + LOG.info("Let the second client receive the rest of the messages"); + assertEquals("no duplicate message", 0, receiveAllMessages(client2)); + assertEquals("no duplicate message", 0, receiveAllMessages(client2)); + + assertEquals("1 messages enqueued on dlq", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount()); + assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount(); + } + })); + } + @SuppressWarnings("unchecked") public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception { broker1 = "BrokerA"; @@ -527,6 +604,128 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount()); } + @SuppressWarnings("unchecked") + public void testDuplicateSendWithNoAuditEnqueueCountStatConcurrentStoreAndDispatch() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + NetworkConnector networkConnector = bridgeBrokers(broker1, broker2); + + final AtomicBoolean first = new AtomicBoolean(); + final CountDownLatch gotMessageLatch = new CountDownLatch(1); + + BrokerService brokerService = brokers.get(broker2).broker; + brokerService.setPersistent(true); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public void send(final ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) + throws Exception { + super.send(producerExchange, messageSend); + if (first.compareAndSet(false, true)) { + producerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Waiting for recepit"); + assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS)); + LOG.info("Stopping connection post send and receive by local queue over bridge"); + producerExchange.getConnectionContext().getConnection().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + + // Create queue + final ActiveMQDestination dest = createDestination("TEST.FOO", false); + + // statically include our destination + networkConnector.addStaticallyIncludedDestination(dest); + + // Run brokers + startAllBrokers(); + + waitForBridgeFormation(); + + sendMessages("BrokerA", dest, 1); + + // wait for broker2 to get the initial forward + Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1; + } + }); + + // message still pending on broker1 + assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount()); + + // allow the bridge to be shutdown and restarted + gotMessageLatch.countDown(); + + + // verify message is forwarded after restart + assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount(); + } + })); + + // duplicate ready to dispatch + assertEquals("one messages pending", 2, brokers.get(broker2).broker.getAdminView().getTotalMessageCount()); + assertEquals("one messages enqueued", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount()); + assertEquals("one messages", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount()); + + // only one message available in the store... + + Connection conn = createConnection(broker2); + conn.start(); + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = sess.createConsumer(dest); + assertEquals("Client got message", 1, receiveExactMessages(messageConsumer, 1)); + messageConsumer.close(); // no ack + + assertTrue("1 messages enqueued on origin", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount(); + } + })); + + // restart to validate message not acked due to duplicate processing + // consume again and ack + destroyAllBrokers(); + + createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?useJmx=true&advisorySupport=false")).start(); + + assertEquals("Receive after restart and previous receive unacked", 1, receiveExactMessages(createConsumer(broker2, dest), 1)); + + assertTrue("no messages enqueued", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount(); + } + })); + + final ActiveMQDestination dlq = createDestination("ActiveMQ.DLQ", false); + assertTrue("one message still on dlq", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == brokers.get(broker2).broker.getDestination(dlq).getDestinationStatistics().getMessages().getCount(); + } + })); + + } + protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception { Message msg; int i; @@ -567,6 +766,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu protected void configureBroker(BrokerService broker) { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); defaultEntry.setEnableAudit(false); policyMap.setDefaultEntry(defaultEntry); broker.setDestinationPolicy(policyMap); @@ -584,8 +784,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); - createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=true")); - createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=true")); + createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=true")); + createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=true")); // Configure broker connection factory ActiveMQConnectionFactory factoryA; @@ -600,6 +800,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu factoryA.setPrefetchPolicy(policy); factoryB.setPrefetchPolicy(policy); + factoryA.setWatchTopicAdvisories(false); + factoryB.setWatchTopicAdvisories(false); msgsClient1 = 0; msgsClient2 = 0; }