ARTEMIS-1764 Ignore Ack for Prepared Tx when Queue deleted
This commit is contained in:
parent
d8f22a399b
commit
f8547aecf1
|
@ -1581,6 +1581,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
@Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
|
void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void journalMessageAckMissingQueueInPreparedTX(Long queueID);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void initializationError(@Cause Throwable e);
|
void initializationError(@Cause Throwable e);
|
||||||
|
|
|
@ -304,15 +304,14 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
Queue queue = queues.get(queueID);
|
Queue queue = queues.get(queueID);
|
||||||
|
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
throw new IllegalStateException("Cannot find queue with id " + queueID);
|
ActiveMQServerLogger.LOGGER.journalMessageAckMissingQueueInPreparedTX(queueID);
|
||||||
}
|
|
||||||
|
|
||||||
MessageReference removed = queue.removeReferenceWithID(messageID);
|
|
||||||
|
|
||||||
if (removed == null) {
|
|
||||||
ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID);
|
|
||||||
} else {
|
} else {
|
||||||
referencesToAck.add(removed);
|
MessageReference removed = queue.removeReferenceWithID(messageID);
|
||||||
|
if (removed == null) {
|
||||||
|
ActiveMQServerLogger.LOGGER.journalErrorRemovingRef(messageID);
|
||||||
|
} else {
|
||||||
|
referencesToAck.add(removed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -213,6 +213,37 @@ public class BasicXaTest extends ActiveMQTestBase {
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestartWithTXPrepareDeletedQueue() throws Exception {
|
||||||
|
|
||||||
|
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
|
||||||
|
ClientProducer clientProducer = clientSession2.createProducer(atestq);
|
||||||
|
ClientMessage m1 = createTextMessage(clientSession2, "m1");
|
||||||
|
clientProducer.send(m1);
|
||||||
|
|
||||||
|
Xid xid = newXID();
|
||||||
|
clientSession.start(xid, XAResource.TMNOFLAGS);
|
||||||
|
clientSession.start();
|
||||||
|
|
||||||
|
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
|
||||||
|
|
||||||
|
ClientMessage message = clientConsumer.receive(5000);
|
||||||
|
message.acknowledge();
|
||||||
|
clientSession.end(xid, XAResource.TMSUCCESS);
|
||||||
|
clientSession.prepare(xid);
|
||||||
|
|
||||||
|
clientSession.getSessionFactory().getConnection().destroy();
|
||||||
|
|
||||||
|
messagingService.destroyQueue(atestq);
|
||||||
|
|
||||||
|
messagingService.stop();
|
||||||
|
messagingService.start();
|
||||||
|
|
||||||
|
messagingService.waitForActivation(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertTrue(messagingService.isStarted());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testXAInterleaveResourceRollbackAfterPrepare() throws Exception {
|
public void testXAInterleaveResourceRollbackAfterPrepare() throws Exception {
|
||||||
Xid xid = newXID();
|
Xid xid = newXID();
|
||||||
|
|
Loading…
Reference in New Issue