From 98a6e42a57038e2a97d13c2cf1d3a70dd30f5ae1 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 4 Nov 2021 20:56:06 -0400 Subject: [PATCH] ARTEMIS-3554 Invalid Prepared Transaction could interrupt server reload --- .../AbstractJournalStorageManager.java | 396 ++++++++++-------- .../core/server/ActiveMQServerLogger.java | 8 + .../tests/integration/paging/PagingTest.java | 106 +++++ 3 files changed, 329 insertions(+), 181 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 10c33996bd..b7d3101510 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import javax.transaction.xa.Xid; @@ -1205,7 +1206,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp journalLoader.handleAddMessage(queueMap); - loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions, pendingLargeMessages, journalLoader); + loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, this::failedToPrepareException, pageSubscriptions, pendingLargeMessages, journalLoader); for (PageSubscription sub : pageSubscriptions.values()) { sub.getCounter().processReload(); @@ -1236,6 +1237,22 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } } + private void failedToPrepareException(PreparedTransactionInfo txInfo, Throwable e) { + XidEncoding encodingXid = null; + try { + encodingXid = new XidEncoding(txInfo.getExtraData()); + } catch (Throwable ignored) { + } + + ActiveMQServerLogger.LOGGER.failedToLoadPreparedTX(e, String.valueOf(encodingXid != null ? encodingXid.xid : null)); + + try { + rollback(txInfo.getId()); + } catch (Throwable e2) { + logger.warn(e.getMessage(), e2); + } + } + private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer buff) { Message message = MessagePersister.getInstance().decode(buff, null, pools, this); return message; @@ -1702,197 +1719,214 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final ResourceManager resourceManager, final Map queueInfos, final List preparedTransactions, - final Map>> duplicateIDMap, + final BiConsumer failedTransactionCallback, final Map pageSubscriptions, final Set> pendingLargeMessages, JournalLoader journalLoader) throws Exception { // recover prepared transactions - CoreMessageObjectPools pools = null; + final CoreMessageObjectPools pools = new CoreMessageObjectPools(); for (PreparedTransactionInfo preparedTransaction : preparedTransactions) { - XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); - - Xid xid = encodingXid.xid; - - Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this); - - List referencesToAck = new ArrayList<>(); - - Map messages = new HashMap<>(); - - // Use same method as load message journal to prune out acks, so they don't get added. - // Then have reacknowledge(tx) methods on queue, which needs to add the page size - - // first get any sent messages for this tx and recreate - for (RecordInfo record : preparedTransaction.getRecords()) { - byte[] data = record.data; - - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); - - byte recordType = record.getUserRecordType(); - - switch (recordType) { - case JournalRecordIds.ADD_LARGE_MESSAGE: { - messages.put(record.id, parseLargeMessage(buff).toMessage()); - - break; - } - case JournalRecordIds.ADD_MESSAGE: { - - break; - } - case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { - if (pools == null) { - pools = new CoreMessageObjectPools(); - } - Message message = decodeMessage(pools, buff); - - messages.put(record.id, message); - - break; - } - case JournalRecordIds.ADD_REF: { - long messageID = record.id; - - RefEncoding encoding = new RefEncoding(); - - encoding.decode(buff); - - Message message = messages.get(messageID); - - if (message == null) { - throw new IllegalStateException("Cannot find message with id " + messageID); - } - - journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID); - - break; - } - case JournalRecordIds.ACKNOWLEDGE_REF: { - long messageID = record.id; - - RefEncoding encoding = new RefEncoding(); - - encoding.decode(buff); - - journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID); - - break; - } - case JournalRecordIds.PAGE_TRANSACTION: { - - PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl(); - - pageTransactionInfo.decode(buff); - - if (record.isUpdate) { - PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID()); - if (pgTX != null) { - pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); - } - } else { - pageTransactionInfo.setCommitted(false); - - tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); - - pagingManager.addTransaction(pageTransactionInfo); - - tx.addOperation(new FinishPageMessageOperation()); - } - - break; - } - case SET_SCHEDULED_DELIVERY_TIME: { - // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which - // case the message will already have the header for the scheduled delivery time, so no need to do - // anything. - - break; - } - case DUPLICATE_ID: { - // We need load the duplicate ids at prepare time too - DuplicateIDEncoding encoding = new DuplicateIDEncoding(); - - encoding.decode(buff); - - DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address); - - cache.load(tx, encoding.duplID); - - break; - } - case ACKNOWLEDGE_CURSOR: { - CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); - encoding.decode(buff); - - encoding.position.setRecordID(record.id); - - PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); - - if (sub != null) { - sub.reloadPreparedACK(tx, encoding.position); - referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub)); - } else { - ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID); - } - break; - } - case PAGE_CURSOR_COUNTER_VALUE: { - ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared(); - - break; - } - - case PAGE_CURSOR_COUNTER_INC: { - PageCountRecordInc encoding = new PageCountRecordInc(); - - encoding.decode(buff); - - PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); - - if (sub != null) { - sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize()); - sub.notEmpty(); - } else { - ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); - } - - break; - } - - default: { - ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType); - } + try { + loadSinglePreparedTransaction(postOffice, pagingManager, resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages, journalLoader, pools, preparedTransaction); + } catch (Throwable e) { + if (failedTransactionCallback != null) { + failedTransactionCallback.accept(preparedTransaction, e); + } else { + logger.warn(e.getMessage(), e); } } - - for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) { - byte[] data = recordDeleted.data; - - if (data.length > 0) { - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); - byte b = buff.readByte(); - - switch (b) { - case ADD_LARGE_MESSAGE_PENDING: { - long messageID = buff.readLong(); - if (!pendingLargeMessages.remove(new Pair<>(recordDeleted.id, messageID))) { - ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id); - } - installLargeMessageConfirmationOnTX(tx, recordDeleted.id); - break; - } - default: - ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b); - } - } - - } - - journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager); } } + private void loadSinglePreparedTransaction(PostOffice postOffice, + PagingManager pagingManager, + ResourceManager resourceManager, + Map queueInfos, + Map pageSubscriptions, + Set> pendingLargeMessages, + JournalLoader journalLoader, + CoreMessageObjectPools pools, + PreparedTransactionInfo preparedTransaction) throws Exception { + XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); + + Xid xid = encodingXid.xid; + + Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this); + + List referencesToAck = new ArrayList<>(); + + Map messages = new HashMap<>(); + + // Use same method as load message journal to prune out acks, so they don't get added. + // Then have reacknowledge(tx) methods on queue, which needs to add the page size + + // first get any sent messages for this tx and recreate + for (RecordInfo record : preparedTransaction.getRecords()) { + byte[] data = record.data; + + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + + byte recordType = record.getUserRecordType(); + + switch (recordType) { + case JournalRecordIds.ADD_LARGE_MESSAGE: { + messages.put(record.id, parseLargeMessage(buff).toMessage()); + + break; + } + case JournalRecordIds.ADD_MESSAGE: { + + break; + } + case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { + Message message = decodeMessage(pools, buff); + + messages.put(record.id, message); + + break; + } + case JournalRecordIds.ADD_REF: { + long messageID = record.id; + + RefEncoding encoding = new RefEncoding(); + + encoding.decode(buff); + + Message message = messages.get(messageID); + + if (message == null) { + throw new IllegalStateException("Cannot find message with id " + messageID); + } + + journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID); + + break; + } + case JournalRecordIds.ACKNOWLEDGE_REF: { + long messageID = record.id; + + RefEncoding encoding = new RefEncoding(); + + encoding.decode(buff); + + journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID); + + break; + } + case JournalRecordIds.PAGE_TRANSACTION: { + + PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl(); + + pageTransactionInfo.decode(buff); + + if (record.isUpdate) { + PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID()); + if (pgTX != null) { + pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); + } + } else { + pageTransactionInfo.setCommitted(false); + + tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); + + pagingManager.addTransaction(pageTransactionInfo); + + tx.addOperation(new FinishPageMessageOperation()); + } + + break; + } + case SET_SCHEDULED_DELIVERY_TIME: { + // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which + // case the message will already have the header for the scheduled delivery time, so no need to do + // anything. + + break; + } + case DUPLICATE_ID: { + // We need load the duplicate ids at prepare time too + DuplicateIDEncoding encoding = new DuplicateIDEncoding(); + + encoding.decode(buff); + + DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address); + + cache.load(tx, encoding.duplID); + + break; + } + case ACKNOWLEDGE_CURSOR: { + CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); + encoding.decode(buff); + + encoding.position.setRecordID(record.id); + + PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.reloadPreparedACK(tx, encoding.position); + referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub)); + } else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID); + } + break; + } + case PAGE_CURSOR_COUNTER_VALUE: { + ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared(); + + break; + } + + case PAGE_CURSOR_COUNTER_INC: { + PageCountRecordInc encoding = new PageCountRecordInc(); + + encoding.decode(buff); + + PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize()); + sub.notEmpty(); + } else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); + } + + break; + } + + default: { + ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType); + } + } + } + + for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) { + byte[] data = recordDeleted.data; + + if (data.length > 0) { + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + byte b = buff.readByte(); + + switch (b) { + case ADD_LARGE_MESSAGE_PENDING: { + long messageID = buff.readLong(); + if (!pendingLargeMessages.remove(new Pair<>(recordDeleted.id, messageID))) { + ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id); + } + installLargeMessageConfirmationOnTX(tx, recordDeleted.id); + break; + } + default: + ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b); + } + } + + } + + journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager); + } + OperationContext getContext(final boolean sync) { if (sync) { return getContext(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index ac75becdc8..27d3d3d7f4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1768,6 +1768,14 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void federationDispatchError(@Cause Throwable e, String message); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222306, value = "Failed to load prepared TX and it will be rolled back: {0}", + format = Message.Format.MESSAGE_FORMAT) + void failedToLoadPreparedTX(@Cause Throwable e, String message); + + + + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 29a66944b5..86de254cf4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -1314,6 +1314,112 @@ public class PagingTest extends ActiveMQTestBase { session.close(); } + + @Test + public void testPreparedACKRemoveAndRestart() throws Exception { + Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE); + + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 10; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setAckBatchSize(0); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + session.createQueue(new QueueConfiguration(PagingTest.ADDRESS)); + + Queue queue = server.locateQueue(PagingTest.ADDRESS); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + queue.getPageSubscription().getPagingStore().startPaging(); + + forcePage(queue); + + for (int i = 0; i < numberOfMessages; i++) { + ClientMessage message = session.createMessage(true); + + message.putIntProperty("count", i); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + + if (i == 4) { + session.commit(); + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + } + + session.commit(); + + session.close(); + + session = sf.createSession(true, false, false); + + + ClientConsumer cons = session.createConsumer(ADDRESS); + + session.start(); + + for (int i = 0; i <= 4; i++) { + Xid xidConsumeNoCommit = newXID(); + session.start(xidConsumeNoCommit, XAResource.TMNOFLAGS); + // First message is consumed, prepared, will be rolled back later + ClientMessage firstMessageConsumed = cons.receive(5000); + assertNotNull(firstMessageConsumed); + firstMessageConsumed.acknowledge(); + session.end(xidConsumeNoCommit, XAResource.TMSUCCESS); + session.prepare(xidConsumeNoCommit); + } + + File pagingFolder = queue.getPageSubscription().getPagingStore().getFolder(); + + server.stop(); + + // remove the very first page. a restart should not fail + File fileToRemove = new File(pagingFolder, "000000001.page"); + Assert.assertTrue(fileToRemove.delete()); + + server.start(); + + sf = createSessionFactory(locator); + + session = sf.createSession(false, true, true); + + cons = session.createConsumer(ADDRESS); + + session.start(); + + for (int i = 5; i < numberOfMessages; i++) { + ClientMessage message = cons.receive(1000); + assertNotNull(message); + assertEquals(i, message.getIntProperty("count").intValue()); + message.acknowledge(); + } + assertNull(cons.receiveImmediate()); + session.commit(); + } + /** * @param queue * @throws InterruptedException