mirror of https://github.com/apache/activemq.git
AMQ-7185 - track durable sub prepared acks in preCommit callback and release/remove on xa outcome, avoid duplicate delivery; fix and test
This commit is contained in:
parent
56ffcae3c7
commit
487d4a112e
|
@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.command.RemoveInfo;
|
import org.apache.activemq.command.RemoveInfo;
|
||||||
import org.apache.activemq.store.TopicMessageStore;
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
|
import org.apache.activemq.transaction.Synchronization;
|
||||||
import org.apache.activemq.usage.SystemUsage;
|
import org.apache.activemq.usage.SystemUsage;
|
||||||
import org.apache.activemq.usage.Usage;
|
import org.apache.activemq.usage.Usage;
|
||||||
import org.apache.activemq.usage.UsageListener;
|
import org.apache.activemq.usage.UsageListener;
|
||||||
|
@ -58,6 +60,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
||||||
private boolean keepDurableSubsActive;
|
private boolean keepDurableSubsActive;
|
||||||
private final AtomicBoolean active = new AtomicBoolean();
|
private final AtomicBoolean active = new AtomicBoolean();
|
||||||
private final AtomicLong offlineTimestamp = new AtomicLong(-1);
|
private final AtomicLong offlineTimestamp = new AtomicLong(-1);
|
||||||
|
private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>();
|
||||||
|
|
||||||
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
|
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
|
||||||
throws JMSException {
|
throws JMSException {
|
||||||
|
@ -319,16 +322,47 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean canDispatch(MessageReference node) {
|
protected boolean canDispatch(MessageReference node) {
|
||||||
|
if (!ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId())) {
|
||||||
|
return false; // prepared ack
|
||||||
|
}
|
||||||
return true; // let them go, our dispatchPending gates the active / inactive state.
|
return true; // let them go, our dispatchPending gates the active / inactive state.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
|
protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException {
|
||||||
this.setTimeOfLastMessageAck(System.currentTimeMillis());
|
this.setTimeOfLastMessageAck(System.currentTimeMillis());
|
||||||
Destination regionDestination = (Destination) node.getRegionDestination();
|
Destination regionDestination = (Destination) node.getRegionDestination();
|
||||||
regionDestination.acknowledge(context, this, ack, node);
|
regionDestination.acknowledge(context, this, ack, node);
|
||||||
redeliveredMessages.remove(node.getMessageId());
|
redeliveredMessages.remove(node.getMessageId());
|
||||||
node.decrementReferenceCount();
|
node.decrementReferenceCount();
|
||||||
|
if (context.isInTransaction() && context.getTransaction().getTransactionId().isXATransaction()) {
|
||||||
|
context.getTransaction().addSynchronization(new Synchronization() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeCommit() throws Exception {
|
||||||
|
// post xa prepare call
|
||||||
|
synchronized (pendingLock) {
|
||||||
|
ackedAndPrepared.add(node.getMessageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterCommit() throws Exception {
|
||||||
|
synchronized (pendingLock) {
|
||||||
|
ackedAndPrepared.remove(node.getMessageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterRollback() throws Exception {
|
||||||
|
synchronized (pendingLock) {
|
||||||
|
ackedAndPrepared.remove(node.getMessageId());
|
||||||
|
pending.addMessageFirst(node);
|
||||||
|
}
|
||||||
|
dispatchPending();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
|
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
|
||||||
if (info.isNetworkSubscription()) {
|
if (info.isNetworkSubscription()) {
|
||||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
|
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
|
||||||
|
@ -368,6 +402,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
||||||
node.decrementReferenceCount();
|
node.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
dispatched.clear();
|
dispatched.clear();
|
||||||
|
ackedAndPrepared.clear();
|
||||||
}
|
}
|
||||||
setSlowConsumer(false);
|
setSlowConsumer(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class XATransaction extends Transaction {
|
||||||
case PREPARED_STATE:
|
case PREPARED_STATE:
|
||||||
// 2 phase commit, work done.
|
// 2 phase commit, work done.
|
||||||
// We would record commit here.
|
// We would record commit here.
|
||||||
storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
|
storeCommit(getTransactionId(), true, null /* done post prepare call */, postCommitTask);
|
||||||
setStateFinished();
|
setStateFinished();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -198,6 +198,7 @@ public class XATransaction extends Transaction {
|
||||||
doPrePrepare();
|
doPrePrepare();
|
||||||
setState(Transaction.PREPARED_STATE);
|
setState(Transaction.PREPARED_STATE);
|
||||||
transactionStore.prepare(getTransactionId());
|
transactionStore.prepare(getTransactionId());
|
||||||
|
preCommitTask.run();
|
||||||
return XAResource.XA_OK;
|
return XAResource.XA_OK;
|
||||||
default:
|
default:
|
||||||
illegalStateTransition("prepare");
|
illegalStateTransition("prepare");
|
||||||
|
|
|
@ -545,7 +545,11 @@ public class TransactionContext implements XAResource {
|
||||||
// No risk for concurrent updates as we own the list now
|
// No risk for concurrent updates as we own the list now
|
||||||
if (l != null) {
|
if (l != null) {
|
||||||
for (TransactionContext ctx : l) {
|
for (TransactionContext ctx : l) {
|
||||||
ctx.afterRollback();
|
try {
|
||||||
|
ctx.afterRollback();
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
|
||||||
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
|
||||||
public boolean prioritySupport = true;
|
public boolean prioritySupport = true;
|
||||||
|
public boolean keepDurableSubsActive = false;
|
||||||
|
|
||||||
public void testPreparedJmxView() throws Exception {
|
public void testPreparedJmxView() throws Exception {
|
||||||
|
|
||||||
|
@ -1326,6 +1327,170 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
|
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void initCombosForTestTopicPersistentPreparedAcksUnavailableTillRollback() {
|
||||||
|
addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception {
|
||||||
|
|
||||||
|
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
|
||||||
|
|
||||||
|
// Setup the producer and send the message.
|
||||||
|
StubConnection connection = createConnection();
|
||||||
|
ConnectionInfo connectionInfo = createConnectionInfo();
|
||||||
|
connectionInfo.setClientId("durable");
|
||||||
|
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
|
||||||
|
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
|
||||||
|
connection.send(connectionInfo);
|
||||||
|
connection.send(sessionInfo);
|
||||||
|
connection.send(producerInfo);
|
||||||
|
|
||||||
|
// setup durable subs
|
||||||
|
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
|
||||||
|
consumerInfo.setSubscriptionName("durable");
|
||||||
|
connection.send(consumerInfo);
|
||||||
|
|
||||||
|
int numMessages = 4;
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
Message message = createMessage(producerInfo, destination);
|
||||||
|
message.setPersistent(true);
|
||||||
|
connection.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin the transaction.
|
||||||
|
XATransactionId txid = createXATransaction(sessionInfo);
|
||||||
|
connection.send(createBeginTransaction(connectionInfo, txid));
|
||||||
|
|
||||||
|
Message message = null;
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
message = receiveMessage(connection);
|
||||||
|
assertNotNull(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// one ack with last received, mimic a beforeEnd synchronization
|
||||||
|
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
|
||||||
|
ack.setTransactionId(txid);
|
||||||
|
connection.send(ack);
|
||||||
|
|
||||||
|
connection.request(createPrepareTransaction(connectionInfo, txid));
|
||||||
|
|
||||||
|
// reconnect, verify perpared acks unavailable
|
||||||
|
connection.request(closeConnectionInfo(connectionInfo));
|
||||||
|
|
||||||
|
LOG.info("new consumer for *no* redelivery");
|
||||||
|
|
||||||
|
connectionInfo = createConnectionInfo();
|
||||||
|
connectionInfo.setClientId("durable");
|
||||||
|
sessionInfo = createSessionInfo(connectionInfo);
|
||||||
|
connection.send(connectionInfo);
|
||||||
|
connection.send(sessionInfo);
|
||||||
|
|
||||||
|
// setup durable subs
|
||||||
|
consumerInfo = createConsumerInfo(sessionInfo, destination);
|
||||||
|
consumerInfo.setSubscriptionName("durable");
|
||||||
|
connection.send(consumerInfo);
|
||||||
|
|
||||||
|
message = receiveMessage(connection, 2000);
|
||||||
|
assertNull("unexpected non null", message);
|
||||||
|
|
||||||
|
// rollback original tx
|
||||||
|
connection.request(createRollbackTransaction(connectionInfo, txid));
|
||||||
|
|
||||||
|
// verify receive after rollback
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
message = receiveMessage(connection);
|
||||||
|
assertNotNull("unexpected null on:" + i, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsubscribe
|
||||||
|
connection.request(consumerInfo.createRemoveCommand());
|
||||||
|
RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
|
||||||
|
removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
|
||||||
|
removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
|
||||||
|
connection.request(removeSubscriptionInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestTopicPersistentPreparedAcksUnavailableTillComplete() {
|
||||||
|
addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTopicPersistentPreparedAcksUnavailableTillComplete() throws Exception {
|
||||||
|
|
||||||
|
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
|
||||||
|
|
||||||
|
// Setup the producer and send the message.
|
||||||
|
StubConnection connection = createConnection();
|
||||||
|
ConnectionInfo connectionInfo = createConnectionInfo();
|
||||||
|
connectionInfo.setClientId("durable");
|
||||||
|
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
|
||||||
|
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
|
||||||
|
connection.send(connectionInfo);
|
||||||
|
connection.send(sessionInfo);
|
||||||
|
connection.send(producerInfo);
|
||||||
|
|
||||||
|
// setup durable subs
|
||||||
|
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
|
||||||
|
consumerInfo.setSubscriptionName("durable");
|
||||||
|
connection.send(consumerInfo);
|
||||||
|
|
||||||
|
int numMessages = 4;
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
Message message = createMessage(producerInfo, destination);
|
||||||
|
message.setPersistent(true);
|
||||||
|
connection.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin the transaction.
|
||||||
|
XATransactionId txid = createXATransaction(sessionInfo);
|
||||||
|
connection.send(createBeginTransaction(connectionInfo, txid));
|
||||||
|
|
||||||
|
Message message = null;
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
message = receiveMessage(connection);
|
||||||
|
assertNotNull(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// one ack with last received, mimic a beforeEnd synchronization
|
||||||
|
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
|
||||||
|
ack.setTransactionId(txid);
|
||||||
|
connection.send(ack);
|
||||||
|
|
||||||
|
connection.request(createPrepareTransaction(connectionInfo, txid));
|
||||||
|
|
||||||
|
// reconnect, verify perpared acks unavailable
|
||||||
|
connection.request(closeConnectionInfo(connectionInfo));
|
||||||
|
|
||||||
|
LOG.info("new consumer for *no* redelivery");
|
||||||
|
|
||||||
|
connectionInfo = createConnectionInfo();
|
||||||
|
connectionInfo.setClientId("durable");
|
||||||
|
sessionInfo = createSessionInfo(connectionInfo);
|
||||||
|
connection.send(connectionInfo);
|
||||||
|
connection.send(sessionInfo);
|
||||||
|
|
||||||
|
// setup durable subs
|
||||||
|
consumerInfo = createConsumerInfo(sessionInfo, destination);
|
||||||
|
consumerInfo.setSubscriptionName("durable");
|
||||||
|
connection.send(consumerInfo);
|
||||||
|
|
||||||
|
message = receiveMessage(connection, 2000);
|
||||||
|
assertNull("unexpected non null", message);
|
||||||
|
|
||||||
|
// commit original tx
|
||||||
|
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
|
||||||
|
|
||||||
|
message = receiveMessage(connection, 2000);
|
||||||
|
assertNull("unexpected non null", message);
|
||||||
|
|
||||||
|
// unsubscribe
|
||||||
|
connection.request(consumerInfo.createRemoveCommand());
|
||||||
|
RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
|
||||||
|
removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
|
||||||
|
removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
|
||||||
|
connection.request(removeSubscriptionInfo);
|
||||||
|
}
|
||||||
|
|
||||||
private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
|
private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
|
||||||
return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
|
return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
|
||||||
}
|
}
|
||||||
|
@ -1405,6 +1570,12 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
return policyEntry;
|
return policyEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void configureBroker(BrokerService broker) throws Exception {
|
||||||
|
super.configureBroker(broker);
|
||||||
|
broker.setKeepDurableSubsActive(keepDurableSubsActive);
|
||||||
|
}
|
||||||
|
|
||||||
public static Test suite() {
|
public static Test suite() {
|
||||||
return suite(XARecoveryBrokerTest.class);
|
return suite(XARecoveryBrokerTest.class);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue