This commit is contained in:
Clebert Suconic 2018-03-22 16:21:17 -04:00
commit 7628236453
3 changed files with 42 additions and 8 deletions

View File

@ -1581,6 +1581,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT)
void journalMessageAckMissingQueueInPreparedTX(Long queueID);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -304,15 +304,14 @@ public class PostOfficeJournalLoader implements JournalLoader {
Queue queue = queues.get(queueID);
if (queue == null) {
throw new IllegalStateException("Cannot find queue with id " + queueID);
}
MessageReference removed = queue.removeReferenceWithID(messageID);
if (removed == null) {
ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID);
ActiveMQServerLogger.LOGGER.journalMessageAckMissingQueueInPreparedTX(queueID);
} else {
referencesToAck.add(removed);
MessageReference removed = queue.removeReferenceWithID(messageID);
if (removed == null) {
ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID);
} else {
referencesToAck.add(removed);
}
}
}

View File

@ -213,6 +213,37 @@ public class BasicXaTest extends ActiveMQTestBase {
assertNotNull(message);
}
@Test
public void testRestartWithTXPrepareDeletedQueue() throws Exception {
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
clientProducer.send(m1);
Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage message = clientConsumer.receive(5000);
message.acknowledge();
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
clientSession.getSessionFactory().getConnection().destroy();
messagingService.destroyQueue(atestq);
messagingService.stop();
messagingService.start();
messagingService.waitForActivation(10, TimeUnit.SECONDS);
assertTrue(messagingService.isStarted());
}
@Test
public void testXAInterleaveResourceRollbackAfterPrepare() throws Exception {
Xid xid = newXID();