mirror of
https://github.com/apache/activemq.git
synced 2025-02-10 12:06:05 +00:00
AMQ-6406 - ensure duplicates trapped by the cursor-add or queue-page-in are removed from the message store
(cherry picked from commit 2b1cda196471280c4fc587d8664d6373e18c97ca)
This commit is contained in:
parent
cf004c205d
commit
1d1c9262fd
@ -96,6 +96,8 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore;
|
||||
|
||||
/**
|
||||
* The Queue is a List of MessageEntry objects that are dispatched to matching
|
||||
* subscriptions.
|
||||
@ -1970,14 +1972,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||
resultList.addMessageLast(ref);
|
||||
} else {
|
||||
ref.decrementReferenceCount();
|
||||
// store should have trapped duplicate in it's index, also cursor audit
|
||||
// we need to remove the duplicate from the store in the knowledge that the original message may be inflight
|
||||
// store should have trapped duplicate in it's index, or cursor audit trapped insert
|
||||
// or producerBrokerExchange suppressed send.
|
||||
// note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
|
||||
LOG.warn("{}, duplicate message {} paged in, is cursor audit disabled? Removing from store and redirecting to dlq", this, ref.getMessage());
|
||||
LOG.warn("{}, duplicate message {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessage());
|
||||
if (store != null) {
|
||||
ConnectionContext connectionContext = createConnectionContext();
|
||||
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
|
||||
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from store for " + destination));
|
||||
dropMessage(ref);
|
||||
if (gotToTheStore(ref.getMessage())) {
|
||||
LOG.debug("Duplicate message {} from cursor, removing from store", this, ref.getMessage());
|
||||
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
|
||||
}
|
||||
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,14 +121,28 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||
}
|
||||
} else {
|
||||
LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
|
||||
if (message.getMessageId().getEntryLocator() instanceof Long) {
|
||||
// JDBC will store a duplicate (with new sequence id) - it needs an ack (AMQ4952Test)
|
||||
if (gotToTheStore(message)) {
|
||||
duplicate(message);
|
||||
}
|
||||
}
|
||||
return recovered;
|
||||
}
|
||||
|
||||
public static boolean gotToTheStore(Message message) throws Exception {
|
||||
if (message.isRecievedByDFBridge()) {
|
||||
// concurrent store and dispatch - wait to see if the message gets to the store to see
|
||||
// if the index suppressed it (original still present), or whether it was stored and needs to be removed
|
||||
Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
|
||||
if (possibleFuture instanceof Future) {
|
||||
((Future) possibleFuture).get();
|
||||
}
|
||||
// need to access again after wait on future
|
||||
Object sequence = message.getMessageId().getFutureOrSequenceLong();
|
||||
return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// track for processing outside of store index lock so we can dlq
|
||||
final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>();
|
||||
private void duplicate(Message message) {
|
||||
|
@ -809,15 +809,10 @@ public class FailoverTransactionTest extends TestSupport {
|
||||
LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
|
||||
try {
|
||||
consumerSession1.commit();
|
||||
} catch (JMSException expectedSometimes) {
|
||||
LOG.info("got exception ex on commit", expectedSometimes);
|
||||
if (expectedSometimes instanceof TransactionRolledBackException) {
|
||||
gotTransactionRolledBackException.set(true);
|
||||
// ok, message one was not replayed so we expect the rollback
|
||||
} else {
|
||||
throw expectedSometimes;
|
||||
}
|
||||
|
||||
} catch (TransactionRolledBackException expected) {
|
||||
LOG.info("got exception ex on commit", expected);
|
||||
gotTransactionRolledBackException.set(true);
|
||||
// ok, message one was not replayed so we expect the rollback
|
||||
}
|
||||
commitDoneLatch.countDown();
|
||||
LOG.info("done async commit");
|
||||
@ -837,24 +832,17 @@ public class FailoverTransactionTest extends TestSupport {
|
||||
|
||||
LOG.info("received message count: " + receivedMessages.size());
|
||||
|
||||
// new transaction
|
||||
Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
|
||||
LOG.info("post: from consumer1 received: " + msg);
|
||||
if (gotTransactionRolledBackException.get()) {
|
||||
assertNotNull("should be available again after commit rollback ex", msg);
|
||||
} else {
|
||||
assertNull("should be nothing left for consumer as receive should have committed", msg);
|
||||
}
|
||||
consumerSession1.commit();
|
||||
|
||||
if (gotTransactionRolledBackException.get() ||
|
||||
!gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
|
||||
// just one message successfully consumed or none consumed
|
||||
// consumer2 should get other message
|
||||
msg = consumer2.receive(10000);
|
||||
LOG.info("post: from consumer2 received: " + msg);
|
||||
assertNotNull("got second message on consumer2", msg);
|
||||
consumerSession2.commit();
|
||||
// new transaction to get both messages from either consumer
|
||||
for (int i=0; i<2; i++) {
|
||||
Message msg = consumer1.receive(5000);
|
||||
LOG.info("post: from consumer1 received: " + msg);
|
||||
consumerSession1.commit();
|
||||
if (msg == null) {
|
||||
msg = consumer2.receive(10000);
|
||||
LOG.info("post: from consumer2 received: " + msg);
|
||||
consumerSession2.commit();
|
||||
}
|
||||
assertNotNull("got message [" + i + "]", msg);
|
||||
}
|
||||
|
||||
for (Connection c : connections) {
|
||||
@ -877,7 +865,7 @@ public class FailoverTransactionTest extends TestSupport {
|
||||
connection.start();
|
||||
Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer sweeper = sweeperSession.createConsumer(destination);
|
||||
msg = sweeper.receive(1000);
|
||||
Message msg = sweeper.receive(1000);
|
||||
LOG.info("Sweep received: " + msg);
|
||||
assertNull("no messges left dangling but got: " + msg, msg);
|
||||
connection.close();
|
||||
|
@ -444,6 +444,83 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDuplicateSendWithCursorAudit() throws Exception {
|
||||
broker1 = "BrokerA";
|
||||
broker2 = "BrokerB";
|
||||
|
||||
brokers.get(broker2).broker.getDestinationPolicy().getDefaultEntry().setEnableAudit(true);
|
||||
|
||||
bridgeBrokers(broker1, broker2);
|
||||
|
||||
final AtomicBoolean first = new AtomicBoolean();
|
||||
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
|
||||
|
||||
BrokerService brokerService = brokers.get(broker2).broker;
|
||||
brokerService.setPersistent(true);
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
brokerService.setPlugins(new BrokerPlugin[]{
|
||||
new BrokerPluginSupport() {
|
||||
@Override
|
||||
public void send(final ProducerBrokerExchange producerExchange,
|
||||
org.apache.activemq.command.Message messageSend)
|
||||
throws Exception {
|
||||
super.send(producerExchange, messageSend);
|
||||
if (first.compareAndSet(false, true)) {
|
||||
producerExchange.getConnectionContext().setDontSendReponse(true);
|
||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Waiting for recepit");
|
||||
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
|
||||
LOG.info("Stopping connection post send and receive and multiple producers");
|
||||
producerExchange.getConnectionContext().getConnection().stop();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Run brokers
|
||||
startAllBrokers();
|
||||
|
||||
waitForBridgeFormation();
|
||||
|
||||
// Create queue
|
||||
Destination dest = createDestination("TEST.FOO", false);
|
||||
|
||||
MessageConsumer client2 = createConsumer(broker2, dest);
|
||||
|
||||
sendMessages("BrokerA", dest, 1);
|
||||
|
||||
assertEquals("Client got message", 1, receiveExactMessages(client2, 1));
|
||||
client2.close();
|
||||
gotMessageLatch.countDown();
|
||||
|
||||
// message still pending on broker1
|
||||
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
|
||||
|
||||
client2 = createConsumer(broker2, dest);
|
||||
|
||||
LOG.info("Let the second client receive the rest of the messages");
|
||||
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
|
||||
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
|
||||
|
||||
assertEquals("1 messages enqueued on dlq", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
|
||||
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception {
|
||||
broker1 = "BrokerA";
|
||||
@ -527,6 +604,128 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
|
||||
assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDuplicateSendWithNoAuditEnqueueCountStatConcurrentStoreAndDispatch() throws Exception {
|
||||
broker1 = "BrokerA";
|
||||
broker2 = "BrokerB";
|
||||
|
||||
NetworkConnector networkConnector = bridgeBrokers(broker1, broker2);
|
||||
|
||||
final AtomicBoolean first = new AtomicBoolean();
|
||||
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
|
||||
|
||||
BrokerService brokerService = brokers.get(broker2).broker;
|
||||
brokerService.setPersistent(true);
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
brokerService.setPlugins(new BrokerPlugin[]{
|
||||
new BrokerPluginSupport() {
|
||||
@Override
|
||||
public void send(final ProducerBrokerExchange producerExchange,
|
||||
org.apache.activemq.command.Message messageSend)
|
||||
throws Exception {
|
||||
super.send(producerExchange, messageSend);
|
||||
if (first.compareAndSet(false, true)) {
|
||||
producerExchange.getConnectionContext().setDontSendReponse(true);
|
||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Waiting for recepit");
|
||||
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
|
||||
LOG.info("Stopping connection post send and receive by local queue over bridge");
|
||||
producerExchange.getConnectionContext().getConnection().stop();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Create queue
|
||||
final ActiveMQDestination dest = createDestination("TEST.FOO", false);
|
||||
|
||||
// statically include our destination
|
||||
networkConnector.addStaticallyIncludedDestination(dest);
|
||||
|
||||
// Run brokers
|
||||
startAllBrokers();
|
||||
|
||||
waitForBridgeFormation();
|
||||
|
||||
sendMessages("BrokerA", dest, 1);
|
||||
|
||||
// wait for broker2 to get the initial forward
|
||||
Wait.waitFor(new Wait.Condition(){
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1;
|
||||
}
|
||||
});
|
||||
|
||||
// message still pending on broker1
|
||||
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
|
||||
|
||||
// allow the bridge to be shutdown and restarted
|
||||
gotMessageLatch.countDown();
|
||||
|
||||
|
||||
// verify message is forwarded after restart
|
||||
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
|
||||
}
|
||||
}));
|
||||
|
||||
// duplicate ready to dispatch
|
||||
assertEquals("one messages pending", 2, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
|
||||
assertEquals("one messages enqueued", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
|
||||
assertEquals("one messages", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount());
|
||||
|
||||
// only one message available in the store...
|
||||
|
||||
Connection conn = createConnection(broker2);
|
||||
conn.start();
|
||||
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
MessageConsumer messageConsumer = sess.createConsumer(dest);
|
||||
assertEquals("Client got message", 1, receiveExactMessages(messageConsumer, 1));
|
||||
messageConsumer.close(); // no ack
|
||||
|
||||
assertTrue("1 messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 1 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
|
||||
}
|
||||
}));
|
||||
|
||||
// restart to validate message not acked due to duplicate processing
|
||||
// consume again and ack
|
||||
destroyAllBrokers();
|
||||
|
||||
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?useJmx=true&advisorySupport=false")).start();
|
||||
|
||||
assertEquals("Receive after restart and previous receive unacked", 1, receiveExactMessages(createConsumer(broker2, dest), 1));
|
||||
|
||||
assertTrue("no messages enqueued", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 0 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
|
||||
}
|
||||
}));
|
||||
|
||||
final ActiveMQDestination dlq = createDestination("ActiveMQ.DLQ", false);
|
||||
assertTrue("one message still on dlq", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 1 == brokers.get(broker2).broker.getDestination(dlq).getDestinationStatistics().getMessages().getCount();
|
||||
}
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
|
||||
Message msg;
|
||||
int i;
|
||||
@ -567,6 +766,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
|
||||
protected void configureBroker(BrokerService broker) {
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setExpireMessagesPeriod(0);
|
||||
defaultEntry.setEnableAudit(false);
|
||||
policyMap.setDefaultEntry(defaultEntry);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
@ -584,8 +784,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
|
||||
public void setUp() throws Exception {
|
||||
super.setAutoFail(true);
|
||||
super.setUp();
|
||||
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=true"));
|
||||
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=true"));
|
||||
createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=true"));
|
||||
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=true"));
|
||||
|
||||
// Configure broker connection factory
|
||||
ActiveMQConnectionFactory factoryA;
|
||||
@ -600,6 +800,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
|
||||
factoryA.setPrefetchPolicy(policy);
|
||||
factoryB.setPrefetchPolicy(policy);
|
||||
|
||||
factoryA.setWatchTopicAdvisories(false);
|
||||
factoryB.setWatchTopicAdvisories(false);
|
||||
msgsClient1 = 0;
|
||||
msgsClient2 = 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user