diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 6e0688b464..886695b810 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2230,18 +2230,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } private boolean shouldForward(JournalCommand command) { - boolean result = false; - if (command != null) { - if (command instanceof KahaRemoveMessageCommand) { - result = true; - } else if (command instanceof KahaCommitCommand) { - KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command; - if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) { - result = true; - } - } + if (command == null) { + return false; } - return result; + + return (command instanceof KahaRemoveMessageCommand || command instanceof KahaCommitCommand); } private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java index 1b2219ea62..5da378e73b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java @@ -526,6 +526,82 @@ public class AMQ7067Test { } } + @Test + public void testForwardAcksAndCommitsWithLocalTransaction() throws Exception { + ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCompactAcksAfterNoGC(2); + final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue holdKahaDb = session.createQueue("holdKahaDb"); + MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb); + TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10)); + holdKahaDbProducer.send(helloMessage); + session.commit(); + Queue queue = session.createQueue("test"); + + for (int i = 0; i < 5; i++) { + produce(connection, queue, 60, 512 * 1024); + consume(connection, queue, 30, true); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 150 == getQueueSize(queue.getQueueName()); + } + }); + + // force gc, n data files requires n cycles + int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1; + for (int dataFilesToMove = 0; dataFilesToMove < 10; dataFilesToMove++) { + for (int i = 0; i < limit; i++) { + broker.getPersistenceAdapter().checkpoint(true); + } + // ack compaction task operates in the background + TimeUnit.SECONDS.sleep(2); + } + + session.commit(); + + connection.close(); + curruptIndexFile(getDataDirectory()); + + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + broker.waitUntilStarted(); + + while(true) { + try { + TimeUnit.SECONDS.sleep(1); + System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); + break; + } catch (Exception ex) { + System.out.println(ex.getMessage()); + break; + } + } + + assertEquals(1, getQueueSize(holdKahaDb.getQueueName())); + assertEquals(150, getQueueSize(queue.getQueueName())); + } + + private static void consume(Connection connection, Queue queue, int messageCount, boolean transacted) throws JMSException { + final Session session = connection.createSession(transacted, transacted ? 0 : Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(queue); + + int messagesConsumed = 0; + + while (consumer.receive(1000) != null && messagesConsumed < messageCount) { + messagesConsumed++; + session.commit(); + } + + System.out.println(messagesConsumed + " messages consumed from " + queue.getQueueName()); + session.close(); + } + protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException { MessageProducer producer = xaSession.createProducer(queue); XATransactionId txId = createXATransaction();