fix up destination statistics for recovered transactions, pending adds are not visible, but pending acks are still accounted for in the messages count, commit/rollback updates enqueues/dequeues/messages as expected - https://issues.apache.org/jira/browse/AMQ-3872, https://issues.apache.org/jira/browse/AMQ-3305 - both kahadb and jdbc suffered

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1353024 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-06-22 20:43:24 +00:00
parent 11497e9e98
commit 60624c4e38
5 changed files with 44 additions and 11 deletions

View File

@ -30,7 +30,6 @@ import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.jmx.ManagedRegionBroker; import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BaseCommand; import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
@ -139,19 +138,28 @@ public class TransactionBroker extends BrokerFilter {
private void registerSync(Destination destination, Transaction transaction, BaseCommand command) { private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage()); Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
// ensure one per destination in the list // ensure one per destination in the list
transaction.removeSynchronization(sync); Synchronization existing = transaction.findMatching(sync);
transaction.addSynchronization(sync); if (existing != null) {
((PreparedDestinationCompletion)existing).incrementOpCount();
} else {
transaction.addSynchronization(sync);
}
} }
static class PreparedDestinationCompletion extends Synchronization { static class PreparedDestinationCompletion extends Synchronization {
final Destination destination; final Destination destination;
final boolean messageSend; final boolean messageSend;
int opCount = 1;
public PreparedDestinationCompletion(final Destination destination, boolean messageSend) { public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
this.destination = destination; this.destination = destination;
// rollback relevant to acks, commit to sends // rollback relevant to acks, commit to sends
this.messageSend = messageSend; this.messageSend = messageSend;
} }
public void incrementOpCount() {
opCount++;
}
@Override @Override
public int hashCode() { public int hashCode() {
return System.identityHashCode(destination) + return System.identityHashCode(destination) +
@ -179,9 +187,14 @@ public class TransactionBroker extends BrokerFilter {
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
if (messageSend) { if (messageSend) {
destination.clearPendingMessages(); destination.clearPendingMessages();
destination.getDestinationStatistics().getEnqueues().add(opCount);
destination.getDestinationStatistics().getMessages().add(opCount);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("cleared pending from afterCommit : " + destination); LOG.debug("cleared pending from afterCommit : " + destination);
} }
} else {
destination.getDestinationStatistics().getDequeues().add(opCount);
destination.getDestinationStatistics().getMessages().subtract(opCount);
} }
} }
} }

View File

@ -17,16 +17,10 @@
package org.apache.activemq.store.jdbc; package org.apache.activemq.store.jdbc;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -37,11 +31,9 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore; import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore; import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream; import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.SubscriptionKey;
/** /**
* respect 2pc prepare * respect 2pc prepare
@ -305,6 +297,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination()); JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority()); jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
lastAckCommand.setMessageStore(jdbcTopicMessageStore); lastAckCommand.setMessageStore(jdbcTopicMessageStore);
} else {
// when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
// but the sql is non portable to match BLOB with LIKE etc
// so we make up for it when we recover the ack
((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
} }
} }
} }

View File

@ -354,6 +354,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setLong(1, seq); s.setLong(1, seq);
} else { } else {
byte[] xidVal = xid.getEncodedXidBytes(); byte[] xidVal = xid.getEncodedXidBytes();
xidVal[0] = '-';
setBinaryData(s, 1, xidVal); setBinaryData(s, 1, xidVal);
s.setLong(2, seq); s.setLong(2, seq);
} }

View File

@ -71,6 +71,14 @@ public abstract class Transaction {
} }
} }
public Synchronization findMatching(Synchronization r) {
int existing = synchronizations.indexOf(r);
if (existing != -1) {
return synchronizations.get(existing);
}
return null;
}
public void removeSynchronization(Synchronization r) { public void removeSynchronization(Synchronization r) {
synchronizations.remove(r); synchronizations.remove(r);
} }

View File

@ -108,11 +108,13 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize()); assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());
TransactionId first = (TransactionId)dar.getData()[0]; TransactionId first = (TransactionId)dar.getData()[0];
int commitCount = 0;
// via jmx, force outcome // via jmx, force outcome
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
RecoveredXATransactionViewMBean mbean = getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]); RecoveredXATransactionViewMBean mbean = getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
if (i%2==0) { if (i%2==0) {
mbean.heuristicCommit(); mbean.heuristicCommit();
commitCount++;
} else { } else {
mbean.heuristicRollback(); mbean.heuristicRollback();
} }
@ -124,6 +126,9 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
dar = (DataArrayResponse)response; dar = (DataArrayResponse)response;
assertEquals(0, dar.getData().length); assertEquals(0, dar.getData().length);
// verify messages available
assertEquals("enqueue count reflects outcome", commitCount, destinationView.getQueueSize());
// verify mbeans gone // verify mbeans gone
try { try {
RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first); RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first);
@ -547,11 +552,20 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertNull(m); assertNull(m);
assertNoMessagesLeft(connection); assertNoMessagesLeft(connection);
// validate destination depth via jmx
DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
assertEquals("enqueue count does not see prepared acks", 4, destinationView.getQueueSize());
assertEquals("enqueue count does not see prepared acks", 0, destinationView.getDequeueCount());
connection.request(createCommitTransaction2Phase(connectionInfo, txid)); connection.request(createCommitTransaction2Phase(connectionInfo, txid));
// validate recovery complete // validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
assertEquals("enqueue count does not see commited acks", 0, destinationView.getQueueSize());
assertEquals("enqueue count does not see commited acks", 4, destinationView.getDequeueCount());
} }
public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {