AMQ-7311 - track recovered prepared ack locations on a per subscriber basis, fix and test

This commit is contained in:
gtully 2019-09-26 15:54:57 +01:00
parent 1c5beda834
commit ed5edb03d7
7 changed files with 351 additions and 20 deletions

View File

@ -294,7 +294,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
@Override
public void rollback(ConnectionContext context) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId);
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, jdbcTopicMessageStore.isPrioritizedMessages() ? priority : 0, jdbcTopicMessageStore.getDestination(), subName, clientId);
jdbcTopicMessageStore.complete(clientId, subName);
}

View File

@ -1008,8 +1008,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
String encodedString = rs.getString(1);
byte[] encodedXid = parseBase64Binary(encodedString);
String destination = rs.getString(2);
String subName = rs.getString(3);
String subId = rs.getString(4);
String subId = rs.getString(3);
String subName = rs.getString(4);
jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
subName, subId);

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -419,8 +420,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
protected final Set<String> ackedAndPrepared = new HashSet<>();
protected final Set<String> rolledBackAcks = new HashSet<>();
protected final HashMap<String, Set<String>> ackedAndPreparedMap = new HashMap<String, Set<String>>();
protected final HashMap<String, Set<String>> rolledBackAcksMap = new HashMap<String, Set<String>>();
double doneTasks, canceledTasks = 0;
@ -437,6 +438,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
}
private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) {
return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId();
}
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store.
// 'at most once' XA guarantee
@ -444,6 +449,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
final String key = recoveredTxStateMapKey(destination, ack);
Set ackedAndPrepared = ackedAndPreparedMap.get(key);
if (ackedAndPrepared == null) {
ackedAndPrepared = new LinkedHashSet<String>();
ackedAndPreparedMap.put(key, ackedAndPrepared);
}
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
@ -457,8 +468,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
try {
for (MessageAck ack : acks) {
final String id = ack.getLastMessageId().toProducerKey();
final String key = recoveredTxStateMapKey(destination, ack);
Set ackedAndPrepared = ackedAndPreparedMap.get(key);
if (ackedAndPrepared != null) {
ackedAndPrepared.remove(id);
if (ackedAndPreparedMap.isEmpty()) {
ackedAndPreparedMap.remove(key);
}
}
if (rollback) {
Set rolledBackAcks = rolledBackAcksMap.get(key);
if (rolledBackAcks == null) {
rolledBackAcks = new LinkedHashSet<String>();
rolledBackAcksMap.put(key, rolledBackAcks);
}
rolledBackAcks.add(id);
pageFile.tx().execute(tx -> {
incrementAndAddSizeToStoreStat(tx, dest, 0);
@ -646,12 +669,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
.hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg = loadMessage(entry.getValue().location);
@ -673,10 +697,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg = loadMessage(entry.getValue().location);
@ -695,9 +720,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
}
}
protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
int counter = 0;
String id;
Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey);
if (rolledBackAcks == null) {
return counter;
}
for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
id = iterator.next();
iterator.remove();
@ -710,12 +740,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
break;
}
} else {
LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
LOG.debug("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
}
} else {
LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
}
}
if (rolledBackAcks.isEmpty()) {
rolledBackAcksMap.remove(recoveredTxStateMapKey);
}
return counter;
}
@ -830,7 +863,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
return statistics;
}
});
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
if (ackedAndPrepared != null) {
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
}
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
} finally {
@ -1113,11 +1149,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
subAckPositions = null;
sd.orderIndex.setBatch(tx, cursorPos);
}
recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the sequence set contains the message still
@ -1173,11 +1210,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
}
Entry<Long, MessageKeys> entry = null;
int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the sequence set contains the message still

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
@ -359,6 +360,10 @@ public class KahaDBTransactionStore implements TransactionStore {
MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op;
Buffer ackb = rmOp.getCommand().getAck();
MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
// allow the ack to be tracked back to its durable sub
ConsumerId subKey = new ConsumerId();
subKey.setConnectionId(rmOp.getCommand().getSubscriptionKey());
ack.setConsumerId(subKey);
ackList.add(ack);
}
}

View File

@ -713,7 +713,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
}
public void x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
@ -793,6 +793,97 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// setup durable subs
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("sub");
connection.send(consumerInfo);
ConsumerInfo consumerInfoX = createConsumerInfo(sessionInfo, destination);
consumerInfoX.setSubscriptionName("subX");
connection.send(consumerInfoX);
connection.send(consumerInfoX.createRemoveCommand());
final int numMessages = 4;
for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
final int messageCount = expectedMessageCount(numMessages, destination);
Message m = null;
for (int i = 0; i < messageCount; i++) {
m = receiveMessage(connection);
assertNotNull("unexpected null on: " + i, m);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createPrepareTransaction(connectionInfo, txid));
// restart the broker.
restartBroker();
connection = createConnection();
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
connection.send(connectionInfo);
// validate recovery
TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
sessionInfo = createSessionInfo(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("sub");
connection.send(consumerInfo);
// no redelivery, exactly once semantics unless there is rollback
m = receiveMessage(connection);
assertNull(m);
assertNoMessagesLeft(connection);
// ensure subX can get it's copy of the messages
consumerInfoX = createConsumerInfo(sessionInfo, destination);
consumerInfoX.setSubscriptionName("subX");
connection.send(consumerInfoX);
for (int i = 0; i < messageCount; i++) {
m = receiveMessage(connection);
assertNotNull("unexpected null for subX on: " + i, m);
}
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
// validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
ActiveMQDestination destination = createDestination();

View File

@ -74,4 +74,6 @@ public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
}
public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception {
}
public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() throws Exception {
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
@ -407,6 +408,182 @@ public class XACompletionTest extends TestSupport {
}
@Test
public void testConsumeAfterAckPreparedRolledbackTopic() throws Exception {
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
final ActiveMQTopic destination = new ActiveMQTopic("TEST");
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.setClientID("durable");
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
MessageConsumer consumer = xaSession.createDurableSubscriber(destination, "sub1");
consumer.close();
consumer = xaSession.createDurableSubscriber(destination, "sub2");
sendMessagesTo(10, destination);
XAResource resource = xaSession.getXAResource();
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
dumpMessages();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
int messagesReceived = 0;
for (int i = 0; i < 5; i++) {
Message message = null;
try {
LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
message = consumer.receive(2000);
LOG.info("Received : " + message);
messagesReceived++;
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
consumer.close();
activeMQXAConnection.close();
LOG.info("after close");
broker = restartBroker();
LOG.info("Try consume... after restart");
dumpMessages();
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
xaSession = activeMQXAConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
xaResource.recover(XAResource.TMNOFLAGS);
LOG.info("Rollback outcome for ack");
xaResource.rollback(xids[0]);
assertTrue("got expected", consumeOnlyN(10,"durable", "sub1", destination));
assertTrue("got expected", consumeOnlyN(10, "durable", "sub2", destination));
}
@Test
public void testConsumeAfterAckPreparedCommitTopic() throws Exception {
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
final ActiveMQTopic destination = new ActiveMQTopic("TEST");
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.setClientID("durable");
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
MessageConsumer consumer = xaSession.createDurableSubscriber(destination, "sub1");
consumer.close();
consumer = xaSession.createDurableSubscriber(destination, "sub2");
sendMessagesTo(10, destination);
XAResource resource = xaSession.getXAResource();
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
dumpMessages();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
int messagesReceived = 0;
for (int i = 0; i < 5; i++) {
Message message = null;
try {
LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
message = consumer.receive(2000);
LOG.info("Received : " + message);
messagesReceived++;
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
consumer.close();
activeMQXAConnection.close();
LOG.info("after close");
broker = restartBroker();
LOG.info("Try consume... after restart");
dumpMessages();
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
xaSession = activeMQXAConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
xaResource.recover(XAResource.TMNOFLAGS);
LOG.info("Rollback outcome for ack");
xaResource.commit(xids[0], false);
assertTrue("got expected", consumeOnlyN(10,"durable", "sub1", destination));
assertTrue("got expected", consumeOnlyN(5, "durable", "sub2", destination));
LOG.info("at end...");
dumpMessages();
}
private boolean consumeOnlyN(int expected, String clientId, String subName, ActiveMQTopic destination) throws Exception {
int drained = 0;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + expected);
factory.setWatchTopicAdvisories(false);
javax.jms.Connection connection = factory.createConnection();
connection.setClientID(clientId);
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(destination, subName);
Message message = null;
while ( (message =consumer.receive(2000)) != null) {
drained++;
LOG.info("Sub:" + subName + ", received: " + message.getJMSMessageID());
}
consumer.close();
} finally {
connection.close();
}
return drained == expected;
}
@Test
public void testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws Exception {
@ -938,16 +1115,24 @@ public class XACompletionTest extends TestSupport {
}
protected void sendMessages(int messagesExpected) throws Exception {
sendMessagesTo(messagesExpected, new ActiveMQQueue("TEST"));
}
protected void sendMessagesTo(int messagesExpected, Destination destination) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(connectionUri);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
sendMessagesWith(activeMQConnectionFactory, messagesExpected);
sendMessagesWithTo(activeMQConnectionFactory, messagesExpected, destination);
}
protected void sendMessagesWith(ConnectionFactory factory, int messagesExpected) throws Exception {
sendMessagesWithTo(factory, messagesExpected, new ActiveMQQueue("TEST"));
}
protected void sendMessagesWithTo(ConnectionFactory factory, int messagesExpected, Destination destination) throws Exception {
javax.jms.Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -975,6 +1160,15 @@ public class XACompletionTest extends TestSupport {
LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message);
}
statement.close();
statement = conn.prepareStatement("SELECT LAST_ACKED_ID, CLIENT_ID, SUB_NAME, PRIORITY, XID FROM ACTIVEMQ_ACKS");
result = statement.executeQuery();
LOG.info("Messages in ACKS table db...");
while (result.next()) {
LOG.info("lastAcked: {}, clientId: {}, SUB_NAME: {}, PRIORITY: {}, XID {}",
result.getLong(1), result.getString(2), result.getString(3), result.getInt(4), result.getString(5));
}
statement.close();
conn.close();
}
@ -1011,6 +1205,7 @@ public class XACompletionTest extends TestSupport {
DestinationMap destinationMap = new DestinationMap();
GroupPrincipal anaGroup = new GroupPrincipal(id);
destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), anaGroup);
destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQTopic(">")}), anaGroup);
map.setWriteACLs(destinationMap);
map.setAdminACLs(destinationMap);
map.setReadACLs(destinationMap);