AMQ-8201 Forward commit commands for local transactions as well as XA transactions during ACK compaction

This commit is contained in:
Jonathan Gallimore 2021-03-23 10:38:55 +00:00 committed by Gary Tully
parent 09389d1ef3
commit edd0515d90
2 changed files with 80 additions and 11 deletions

View File

@ -2230,18 +2230,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
private boolean shouldForward(JournalCommand<?> command) {
boolean result = false;
if (command != null) {
if (command instanceof KahaRemoveMessageCommand) {
result = true;
} else if (command instanceof KahaCommitCommand) {
KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
result = true;
if (command == null) {
return false;
}
}
}
return result;
return (command instanceof KahaRemoveMessageCommand || command instanceof KahaCommitCommand);
}
private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {

View File

@ -526,6 +526,82 @@ public class AMQ7067Test {
}
}
@Test
public void testForwardAcksAndCommitsWithLocalTransaction() throws Exception {
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCompactAcksAfterNoGC(2);
final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue holdKahaDb = session.createQueue("holdKahaDb");
MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
holdKahaDbProducer.send(helloMessage);
session.commit();
Queue queue = session.createQueue("test");
for (int i = 0; i < 5; i++) {
produce(connection, queue, 60, 512 * 1024);
consume(connection, queue, 30, true);
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 150 == getQueueSize(queue.getQueueName());
}
});
// force gc, n data files requires n cycles
int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
for (int dataFilesToMove = 0; dataFilesToMove < 10; dataFilesToMove++) {
for (int i = 0; i < limit; i++) {
broker.getPersistenceAdapter().checkpoint(true);
}
// ack compaction task operates in the background
TimeUnit.SECONDS.sleep(2);
}
session.commit();
connection.close();
curruptIndexFile(getDataDirectory());
broker.stop();
broker.waitUntilStopped();
createBroker();
broker.waitUntilStarted();
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
break;
} catch (Exception ex) {
System.out.println(ex.getMessage());
break;
}
}
assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
assertEquals(150, getQueueSize(queue.getQueueName()));
}
private static void consume(Connection connection, Queue queue, int messageCount, boolean transacted) throws JMSException {
final Session session = connection.createSession(transacted, transacted ? 0 : Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(queue);
int messagesConsumed = 0;
while (consumer.receive(1000) != null && messagesConsumed < messageCount) {
messagesConsumed++;
session.commit();
}
System.out.println(messagesConsumed + " messages consumed from " + queue.getQueueName());
session.close();
}
protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
MessageProducer producer = xaSession.createProducer(queue);
XATransactionId txId = createXATransaction();