diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 4d25c58a24..29c8f53977 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1581,6 +1581,10 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT) void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT) + void journalMessageAckMissingQueueInPreparedTX(Long queueID); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 9385be3acd..f9ec9647aa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -304,15 +304,14 @@ public class PostOfficeJournalLoader implements JournalLoader { Queue queue = queues.get(queueID); if (queue == null) { - throw new IllegalStateException("Cannot find queue with id " + queueID); - } - - MessageReference removed = queue.removeReferenceWithID(messageID); - - if (removed == null) { - ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID); + ActiveMQServerLogger.LOGGER.journalMessageAckMissingQueueInPreparedTX(queueID); } else { - referencesToAck.add(removed); + MessageReference removed = queue.removeReferenceWithID(messageID); + if (removed == null) { + ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID); + } else { + referencesToAck.add(removed); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java index 07cacf241b..04fd1a98e6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java @@ -213,6 +213,37 @@ public class BasicXaTest extends ActiveMQTestBase { assertNotNull(message); } + @Test + public void testRestartWithTXPrepareDeletedQueue() throws Exception { + + ClientSession clientSession2 = sessionFactory.createSession(false, true, true); + ClientProducer clientProducer = clientSession2.createProducer(atestq); + ClientMessage m1 = createTextMessage(clientSession2, "m1"); + clientProducer.send(m1); + + Xid xid = newXID(); + clientSession.start(xid, XAResource.TMNOFLAGS); + clientSession.start(); + + ClientConsumer clientConsumer = clientSession.createConsumer(atestq); + + ClientMessage message = clientConsumer.receive(5000); + message.acknowledge(); + clientSession.end(xid, XAResource.TMSUCCESS); + clientSession.prepare(xid); + + clientSession.getSessionFactory().getConnection().destroy(); + + messagingService.destroyQueue(atestq); + + messagingService.stop(); + messagingService.start(); + + messagingService.waitForActivation(10, TimeUnit.SECONDS); + + assertTrue(messagingService.isStarted()); + } + @Test public void testXAInterleaveResourceRollbackAfterPrepare() throws Exception { Xid xid = newXID();