ARTEMIS-3554 Invalid Prepared Transaction could interrupt server reload
This commit is contained in:
parent
4367ec40a6
commit
98a6e42a57
|
@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import javax.transaction.xa.Xid;
|
||||
|
||||
|
@ -1205,7 +1206,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
|
||||
journalLoader.handleAddMessage(queueMap);
|
||||
|
||||
loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions, pendingLargeMessages, journalLoader);
|
||||
loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, this::failedToPrepareException, pageSubscriptions, pendingLargeMessages, journalLoader);
|
||||
|
||||
for (PageSubscription sub : pageSubscriptions.values()) {
|
||||
sub.getCounter().processReload();
|
||||
|
@ -1236,6 +1237,22 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
}
|
||||
}
|
||||
|
||||
private void failedToPrepareException(PreparedTransactionInfo txInfo, Throwable e) {
|
||||
XidEncoding encodingXid = null;
|
||||
try {
|
||||
encodingXid = new XidEncoding(txInfo.getExtraData());
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
|
||||
ActiveMQServerLogger.LOGGER.failedToLoadPreparedTX(e, String.valueOf(encodingXid != null ? encodingXid.xid : null));
|
||||
|
||||
try {
|
||||
rollback(txInfo.getId());
|
||||
} catch (Throwable e2) {
|
||||
logger.warn(e.getMessage(), e2);
|
||||
}
|
||||
}
|
||||
|
||||
private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer buff) {
|
||||
Message message = MessagePersister.getInstance().decode(buff, null, pools, this);
|
||||
return message;
|
||||
|
@ -1702,14 +1719,35 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
final ResourceManager resourceManager,
|
||||
final Map<Long, QueueBindingInfo> queueInfos,
|
||||
final List<PreparedTransactionInfo> preparedTransactions,
|
||||
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
|
||||
final BiConsumer<PreparedTransactionInfo, Throwable> failedTransactionCallback,
|
||||
final Map<Long, PageSubscription> pageSubscriptions,
|
||||
final Set<Pair<Long, Long>> pendingLargeMessages,
|
||||
JournalLoader journalLoader) throws Exception {
|
||||
// recover prepared transactions
|
||||
CoreMessageObjectPools pools = null;
|
||||
final CoreMessageObjectPools pools = new CoreMessageObjectPools();
|
||||
|
||||
for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
|
||||
try {
|
||||
loadSinglePreparedTransaction(postOffice, pagingManager, resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages, journalLoader, pools, preparedTransaction);
|
||||
} catch (Throwable e) {
|
||||
if (failedTransactionCallback != null) {
|
||||
failedTransactionCallback.accept(preparedTransaction, e);
|
||||
} else {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void loadSinglePreparedTransaction(PostOffice postOffice,
|
||||
PagingManager pagingManager,
|
||||
ResourceManager resourceManager,
|
||||
Map<Long, QueueBindingInfo> queueInfos,
|
||||
Map<Long, PageSubscription> pageSubscriptions,
|
||||
Set<Pair<Long, Long>> pendingLargeMessages,
|
||||
JournalLoader journalLoader,
|
||||
CoreMessageObjectPools pools,
|
||||
PreparedTransactionInfo preparedTransaction) throws Exception {
|
||||
XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
|
||||
|
||||
Xid xid = encodingXid.xid;
|
||||
|
@ -1742,9 +1780,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
break;
|
||||
}
|
||||
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
|
||||
if (pools == null) {
|
||||
pools = new CoreMessageObjectPools();
|
||||
}
|
||||
Message message = decodeMessage(pools, buff);
|
||||
|
||||
messages.put(record.id, message);
|
||||
|
@ -1891,7 +1926,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
|
||||
journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager);
|
||||
}
|
||||
}
|
||||
|
||||
OperationContext getContext(final boolean sync) {
|
||||
if (sync) {
|
||||
|
|
|
@ -1768,6 +1768,14 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void federationDispatchError(@Cause Throwable e, String message);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222306, value = "Failed to load prepared TX and it will be rolled back: {0}",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void failedToLoadPreparedTX(@Cause Throwable e, String message);
|
||||
|
||||
|
||||
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||
void initializationError(@Cause Throwable e);
|
||||
|
|
|
@ -1314,6 +1314,112 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
session.close();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPreparedACKRemoveAndRestart() throws Exception {
|
||||
Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE);
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
|
||||
server.start();
|
||||
|
||||
final int numberOfMessages = 10;
|
||||
|
||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setAckBatchSize(0);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
|
||||
|
||||
Queue queue = server.locateQueue(PagingTest.ADDRESS);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
byte[] body = new byte[MESSAGE_SIZE];
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||
|
||||
for (int j = 1; j <= MESSAGE_SIZE; j++) {
|
||||
bb.put(getSamplebyte(j));
|
||||
}
|
||||
|
||||
queue.getPageSubscription().getPagingStore().startPaging();
|
||||
|
||||
forcePage(queue);
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
ClientMessage message = session.createMessage(true);
|
||||
|
||||
message.putIntProperty("count", i);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(body);
|
||||
|
||||
producer.send(message);
|
||||
|
||||
if (i == 4) {
|
||||
session.commit();
|
||||
queue.getPageSubscription().getPagingStore().forceAnotherPage();
|
||||
}
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
session.close();
|
||||
|
||||
session = sf.createSession(true, false, false);
|
||||
|
||||
|
||||
ClientConsumer cons = session.createConsumer(ADDRESS);
|
||||
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i <= 4; i++) {
|
||||
Xid xidConsumeNoCommit = newXID();
|
||||
session.start(xidConsumeNoCommit, XAResource.TMNOFLAGS);
|
||||
// First message is consumed, prepared, will be rolled back later
|
||||
ClientMessage firstMessageConsumed = cons.receive(5000);
|
||||
assertNotNull(firstMessageConsumed);
|
||||
firstMessageConsumed.acknowledge();
|
||||
session.end(xidConsumeNoCommit, XAResource.TMSUCCESS);
|
||||
session.prepare(xidConsumeNoCommit);
|
||||
}
|
||||
|
||||
File pagingFolder = queue.getPageSubscription().getPagingStore().getFolder();
|
||||
|
||||
server.stop();
|
||||
|
||||
// remove the very first page. a restart should not fail
|
||||
File fileToRemove = new File(pagingFolder, "000000001.page");
|
||||
Assert.assertTrue(fileToRemove.delete());
|
||||
|
||||
server.start();
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
session = sf.createSession(false, true, true);
|
||||
|
||||
cons = session.createConsumer(ADDRESS);
|
||||
|
||||
session.start();
|
||||
|
||||
for (int i = 5; i < numberOfMessages; i++) {
|
||||
ClientMessage message = cons.receive(1000);
|
||||
assertNotNull(message);
|
||||
assertEquals(i, message.getIntProperty("count").intValue());
|
||||
message.acknowledge();
|
||||
}
|
||||
assertNull(cons.receiveImmediate());
|
||||
session.commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param queue
|
||||
* @throws InterruptedException
|
||||
|
|
Loading…
Reference in New Issue