diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java index bd0ebec0d8..e4a4c7c167 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -30,7 +30,6 @@ import javax.transaction.xa.XAException; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.jmx.ManagedRegionBroker; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BaseCommand; import org.apache.activemq.command.ConnectionInfo; @@ -139,19 +138,28 @@ public class TransactionBroker extends BrokerFilter { private void registerSync(Destination destination, Transaction transaction, BaseCommand command) { Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage()); // ensure one per destination in the list - transaction.removeSynchronization(sync); - transaction.addSynchronization(sync); + Synchronization existing = transaction.findMatching(sync); + if (existing != null) { + ((PreparedDestinationCompletion)existing).incrementOpCount(); + } else { + transaction.addSynchronization(sync); + } } static class PreparedDestinationCompletion extends Synchronization { final Destination destination; final boolean messageSend; + int opCount = 1; public PreparedDestinationCompletion(final Destination destination, boolean messageSend) { this.destination = destination; // rollback relevant to acks, commit to sends this.messageSend = messageSend; } + public void incrementOpCount() { + opCount++; + } + @Override public int hashCode() { return System.identityHashCode(destination) + @@ -179,9 +187,14 @@ public class TransactionBroker extends BrokerFilter { public void afterCommit() throws Exception { if (messageSend) { destination.clearPendingMessages(); + destination.getDestinationStatistics().getEnqueues().add(opCount); + destination.getDestinationStatistics().getMessages().add(opCount); if (LOG.isDebugEnabled()) { LOG.debug("cleared pending from afterCommit : " + destination); } + } else { + destination.getDestinationStatistics().getDequeues().add(opCount); + destination.getDestinationStatistics().getMessages().subtract(opCount); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index 46f71b5490..785c61a316 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -17,16 +17,10 @@ package org.apache.activemq.store.jdbc; import java.io.IOException; -import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DurableTopicSubscription; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -37,11 +31,9 @@ import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.ProxyTopicMessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; -import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; import org.apache.activemq.store.memory.MemoryTransactionStore; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.DataByteArrayInputStream; -import org.apache.activemq.util.SubscriptionKey; /** * respect 2pc prepare @@ -305,6 +297,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination()); jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority()); lastAckCommand.setMessageStore(jdbcTopicMessageStore); + } else { + // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction), + // but the sql is non portable to match BLOB with LIKE etc + // so we make up for it when we recover the ack + ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment(); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index fabc0522d5..2a28662ce5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -354,6 +354,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { s.setLong(1, seq); } else { byte[] xidVal = xid.getEncodedXidBytes(); + xidVal[0] = '-'; setBinaryData(s, 1, xidVal); s.setLong(2, seq); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java index 4a6a69b689..60b74f82aa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java +++ b/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java @@ -71,6 +71,14 @@ public abstract class Transaction { } } + public Synchronization findMatching(Synchronization r) { + int existing = synchronizations.indexOf(r); + if (existing != -1) { + return synchronizations.get(existing); + } + return null; + } + public void removeSynchronization(Synchronization r) { synchronizations.remove(r); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 878f19f7a5..9932097425 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -108,11 +108,13 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize()); TransactionId first = (TransactionId)dar.getData()[0]; + int commitCount = 0; // via jmx, force outcome for (int i = 0; i < 4; i++) { RecoveredXATransactionViewMBean mbean = getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]); if (i%2==0) { mbean.heuristicCommit(); + commitCount++; } else { mbean.heuristicRollback(); } @@ -124,6 +126,9 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { dar = (DataArrayResponse)response; assertEquals(0, dar.getData().length); + // verify messages available + assertEquals("enqueue count reflects outcome", commitCount, destinationView.getQueueSize()); + // verify mbeans gone try { RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first); @@ -547,11 +552,20 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertNull(m); assertNoMessagesLeft(connection); + // validate destination depth via jmx + DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]); + assertEquals("enqueue count does not see prepared acks", 4, destinationView.getQueueSize()); + assertEquals("enqueue count does not see prepared acks", 0, destinationView.getDequeueCount()); + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); // validate recovery complete dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + + assertEquals("enqueue count does not see commited acks", 0, destinationView.getQueueSize()); + assertEquals("enqueue count does not see commited acks", 4, destinationView.getDequeueCount()); + } public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {