AMQ-7185 - rework to leave tx-inflight messages pending in the cursor to avoid duplicates on completion, fix and test

This commit is contained in:
gtully 2019-09-24 17:32:54 +01:00
parent 289750d7c9
commit 644b529ef6
6 changed files with 288 additions and 5 deletions

View File

@ -322,12 +322,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override
protected boolean canDispatch(MessageReference node) {
if (!ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId())) {
return false; // prepared ack
}
return true; // let them go, our dispatchPending gates the active / inactive state.
}
@Override
protected boolean trackedInPendingTransaction(MessageReference node) {
return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId());
}
@Override
protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException {
this.setTimeOfLastMessageAck(System.currentTimeMillis());
@ -349,6 +351,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override
public void afterCommit() throws Exception {
synchronized (pendingLock) {
// may be in the cursor post activate/load from the store
pending.remove(node);
ackedAndPrepared.remove(node.getMessageId());
}
}
@ -357,7 +361,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void afterRollback() throws Exception {
synchronized (pendingLock) {
ackedAndPrepared.remove(node.getMessageId());
pending.addMessageFirst(node);
}
dispatchPending();
}

View File

@ -641,6 +641,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node == null) {
break;
}
if (trackedInPendingTransaction(node)) {
node.decrementReferenceCount();
continue;
}
// Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action
@ -685,6 +689,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
protected boolean trackedInPendingTransaction(MessageReference node) {
return false;
}
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
pending.setMaxBatchSize(numberToDispatch);
}

View File

@ -217,6 +217,11 @@ public class BrokerTestSupport extends CombinationTestSupport {
return info;
}
protected TransactionInfo createEndTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.END);
return info;
}
protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE);
return info;

View File

@ -1372,6 +1372,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createEndTransaction(connectionInfo, txid));
connection.request(createPrepareTransaction(connectionInfo, txid));
// reconnect, verify perpared acks unavailable
@ -1455,6 +1456,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createEndTransaction(connectionInfo, txid));
connection.request(createPrepareTransaction(connectionInfo, txid));
// reconnect, verify perpared acks unavailable
@ -1479,8 +1481,101 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// commit original tx
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
// verify still unavailable
message = receiveMessage(connection, 2000);
assertNull("unexpected non null", message);
assertNull("unexpected non null: " + message, message);
// unsubscribe
connection.request(consumerInfo.createRemoveCommand());
RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
connection.request(removeSubscriptionInfo);
}
public void initCombosForTestNoDupOnRollbackRedelivery() {
addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
public void testNoDupOnRollbackRedelivery() throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// setup durable subs
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
int numMessages = 1;
for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = null;
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createEndTransaction(connectionInfo, txid));
connection.request(createRollbackTransaction(connectionInfo, txid));
connection.send(consumerInfo.createRemoveCommand());
connection.send(sessionInfo.createRemoveCommand());
connection.send(connectionInfo.createRemoveCommand());
LOG.info("new connection/consumer for redelivery");
connection.request(closeConnectionInfo(connectionInfo));
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
// setup durable subs
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
message = receiveMessage(connection);
assertNotNull(message);
Message dup = receiveMessage(connection);
assertNull("no duplicate send: " + dup, dup);
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createEndTransaction(connectionInfo, txid));
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
// unsubscribe
connection.request(consumerInfo.createRemoveCommand());

View File

@ -72,4 +72,6 @@ public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
public void testQueuePersistentPreparedAcksAvailableAfterRollback() throws Exception {
// pending acks are not tracked in leveldb
}
public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception {
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class AMQ7185Test
{
private final String xaDestinationName = "DestinationXA";
private BrokerService broker;
private String connectionUri;
private long txGenerator = System.currentTimeMillis();
private XAConnectionFactory xaConnectionFactory;
private ConnectionFactory connectionFactory;
final Topic dest = new ActiveMQTopic(xaDestinationName);
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
broker.setAdvisorySupport(false);
broker.addConnector("tcp://0.0.0.0:0?trace=true");
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
connectionFactory = new ActiveMQConnectionFactory(connectionUri);
((ActiveMQConnectionFactory) connectionFactory).setWatchTopicAdvisories(false);
// failover ensure audit is in play
xaConnectionFactory = new ActiveMQXAConnectionFactory("failover://" + connectionUri);
((ActiveMQXAConnectionFactory) xaConnectionFactory).setWatchTopicAdvisories(false);
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
@Test
public void testRollbackRedeliveryNoDup() throws Exception {
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
xaConnection.setClientID("cid0");
xaConnection.start();
XASession session = xaConnection.createXASession();
TopicSubscriber consumer = session.createDurableSubscriber(dest, "sub");
consumer.close();
session.close();
xaConnection.close();
publish(dest);
Xid tid;
TextMessage receivedMessage;
xaConnection = xaConnectionFactory.createXAConnection();
xaConnection.setClientID("cid0");
xaConnection.start();
session = xaConnection.createXASession();
consumer = session.createDurableSubscriber(dest, "sub");
tid = createXid();
XAResource resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
receivedMessage = (TextMessage) consumer.receive(4000);
assertNotNull(receivedMessage);
resource.end(tid, XAResource.TMSUCCESS);
resource.rollback(tid);
consumer.close();
session.close();
xaConnection.close();
// redelivery
xaConnection = xaConnectionFactory.createXAConnection();
xaConnection.setClientID("cid0");
xaConnection.start();
session = xaConnection.createXASession();
consumer = session.createDurableSubscriber(dest, "sub");
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
// verify only one
receivedMessage = (TextMessage) consumer.receiveNoWait();
assertNull(receivedMessage);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
consumer.close();
session.close();
xaConnection.close();
// assertNoMessageInDLQ
assertEquals("Only one enqueue", 1, broker.getAdminView().getTotalEnqueueCount());
}
private void publish(Topic dest) throws JMSException {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(dest).send(new ActiveMQTextMessage());
connection.close();
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
public int getFormatId() {
return 86;
}
public byte[] getGlobalTransactionId() {
return bs;
}
public byte[] getBranchQualifier() {
return bs;
}
};
}
}