This commit is contained in:
Wei Yang 2020-01-23 11:23:16 +08:00
commit 20ca545555
2 changed files with 65 additions and 1 deletions

View File

@ -113,7 +113,8 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
}
}
if (pagingManager != null) {
pagingManager.removeTransaction(this.transactionID);
}
return false;
@ -242,6 +243,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
if (lateDeliveries != null) {
for (LateDelivery pos : lateDeliveries) {
pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
onUpdate(1, null, pos.getSubscription().getPagingStore().getPagingManager());
}
lateDeliveries = null;
}
@ -283,6 +285,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos);
}
cursor.positionIgnored(cursorPos);
onUpdate(1, null, cursor.getPagingStore().getPagingManager());
return true;
} else {
if (logger.isTraceEnabled()) {

View File

@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
@ -6838,6 +6839,66 @@ public class PagingTest extends ActiveMQTestBase {
}
}
@Test
public void testRollbackPageTransactionBeforeDelivery() throws Exception {
testRollbackPageTransaction(true);
}
@Test
public void testRollbackPageTransactionAfterDelivery() throws Exception {
testRollbackPageTransaction(false);
}
private void testRollbackPageTransaction(boolean rollbackBeforeDelivery) throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
final int numberOfMessages = 2;
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
Queue queue = server.locateQueue(PagingTest.ADDRESS);
queue.getPageSubscription().getPagingStore().startPaging();
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
if (rollbackBeforeDelivery) {
sendMessages(session, producer, numberOfMessages);
session.rollback();
assertEquals(server.getPagingManager().getTransactions().size(), 1);
PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next();
// Make sure rollback happens before delivering messages
Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
Assert.assertNull(consumer.receiveImmediate());
assertTrue(server.getPagingManager().getTransactions().isEmpty());
} else {
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
sendMessages(session, producer, numberOfMessages);
Assert.assertNull(consumer.receiveImmediate());
assertEquals(server.getPagingManager().getTransactions().size(), 1);
PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next();
session.rollback();
Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
assertTrue(server.getPagingManager().getTransactions().isEmpty());
}
session.close();
}
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);