mirror of https://github.com/apache/activemq.git
make perpared xa transactions visible in kahadb persistenceadapter view mbean
This commit is contained in:
parent
e8818fafea
commit
69c0d399fb
|
@ -21,7 +21,7 @@ public interface PersistenceAdapterViewMBean {
|
||||||
@MBeanInfo("Name of this persistence adapter.")
|
@MBeanInfo("Name of this persistence adapter.")
|
||||||
String getName();
|
String getName();
|
||||||
|
|
||||||
@MBeanInfo("Current inflight local transactions.")
|
@MBeanInfo("Inflight transactions.")
|
||||||
String getTransactions();
|
String getTransactions();
|
||||||
|
|
||||||
@MBeanInfo("Current data.")
|
@MBeanInfo("Current data.")
|
||||||
|
|
|
@ -563,6 +563,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
synchronized (preparedTransactions) {
|
||||||
|
if (!preparedTransactions.isEmpty()) {
|
||||||
|
for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
|
||||||
|
TranInfo info = new TranInfo();
|
||||||
|
info.id = entry.getKey();
|
||||||
|
for (Operation operation : entry.getValue()) {
|
||||||
|
info.track(operation);
|
||||||
|
}
|
||||||
|
infos.add(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return infos.toString();
|
return infos.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2290,6 +2302,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
|
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
|
||||||
protected final Set<String> ackedAndPrepared = new HashSet<String>();
|
protected final Set<String> ackedAndPrepared = new HashSet<String>();
|
||||||
|
protected final Set<String> rolledBackAcks = new HashSet<String>();
|
||||||
|
|
||||||
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
|
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
|
||||||
// till then they are skipped by the store.
|
// till then they are skipped by the store.
|
||||||
|
@ -2305,12 +2318,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
|
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
|
||||||
if (acks != null) {
|
if (acks != null) {
|
||||||
this.indexLock.writeLock().lock();
|
this.indexLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
for (MessageAck ack : acks) {
|
for (MessageAck ack : acks) {
|
||||||
ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
|
final String id = ack.getLastMessageId().toProducerKey();
|
||||||
|
ackedAndPrepared.remove(id);
|
||||||
|
if (rollback) {
|
||||||
|
rolledBackAcks.add(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.indexLock.writeLock().unlock();
|
this.indexLock.writeLock().unlock();
|
||||||
|
@ -2933,6 +2950,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
return lastGetPriority;
|
return lastGetPriority;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean alreadyDispatched(Long sequence) {
|
||||||
|
return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
|
||||||
|
(cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
|
||||||
|
(cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
|
||||||
|
}
|
||||||
|
|
||||||
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
|
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
|
||||||
Iterator<Entry<Long, MessageKeys>>currentIterator;
|
Iterator<Entry<Long, MessageKeys>>currentIterator;
|
||||||
final Iterator<Entry<Long, MessageKeys>>highIterator;
|
final Iterator<Entry<Long, MessageKeys>>highIterator;
|
||||||
|
|
|
@ -25,10 +25,13 @@ import javax.management.InstanceNotFoundException;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
import junit.framework.Test;
|
import junit.framework.Test;
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
||||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||||
|
import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
|
||||||
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
|
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.command.*;
|
import org.apache.activemq.command.*;
|
||||||
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
import org.apache.activemq.util.JMXSupport;
|
import org.apache.activemq.util.JMXSupport;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -77,6 +80,14 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
DataArrayResponse dar = (DataArrayResponse)response;
|
DataArrayResponse dar = (DataArrayResponse)response;
|
||||||
assertEquals(4, dar.getData().length);
|
assertEquals(4, dar.getData().length);
|
||||||
|
|
||||||
|
// view prepared in kahadb view
|
||||||
|
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
|
||||||
|
PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
|
||||||
|
String txFromView = kahadbView.getTransactions();
|
||||||
|
LOG.info("Tx view fromm PA:" + txFromView);
|
||||||
|
assertTrue("xid with our dud format in transaction string " + txFromView, txFromView.contains("XID:[55,"));
|
||||||
|
}
|
||||||
|
|
||||||
// restart the broker.
|
// restart the broker.
|
||||||
restartBroker();
|
restartBroker();
|
||||||
|
|
||||||
|
@ -125,6 +136,12 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String name) throws MalformedObjectNameException, JMSException {
|
||||||
|
return (PersistenceAdapterViewMBean)broker.getManagementContext().newProxyInstance(
|
||||||
|
BrokerMBeanSupport.createPersistenceAdapterName(broker.getBrokerObjectName().toString(), name),
|
||||||
|
PersistenceAdapterViewMBean.class, true);
|
||||||
|
}
|
||||||
|
|
||||||
private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
|
private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
|
||||||
|
|
||||||
ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,Xid=" +
|
ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,Xid=" +
|
||||||
|
@ -216,7 +233,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
|
|
||||||
// Commit the prepared transactions.
|
// Commit the prepared transactions.
|
||||||
for (int i = 0; i < dar.getData().length; i++) {
|
for (int i = 0; i < dar.getData().length; i++) {
|
||||||
connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));
|
connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
// We should get the committed transactions.
|
// We should get the committed transactions.
|
||||||
|
@ -304,7 +321,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
|
|
||||||
// Commit the prepared transactions.
|
// Commit the prepared transactions.
|
||||||
for (int i = 0; i < dar.getData().length; i++) {
|
for (int i = 0; i < dar.getData().length; i++) {
|
||||||
connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
|
connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
// We should get the committed transactions.
|
// We should get the committed transactions.
|
||||||
|
@ -1057,7 +1074,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
|
||||||
}
|
}
|
||||||
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
|
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
|
||||||
ack.setTransactionId(txid);
|
ack.setTransactionId(txid);
|
||||||
connection.send(ack);
|
connection.request(ack);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't commit
|
// Don't commit
|
||||||
|
|
Loading…
Reference in New Issue