https://issues.apache.org/jira/browse/AMQ-3305 - add RecoveredTransaction JMX MBean to allow administrative heuristic completion of pending xa transactions. handy in the event of the transaction manager having trouble

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1100288 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-05-06 17:13:56 +00:00
parent 101e7110bc
commit 1d242a2786
6 changed files with 247 additions and 9 deletions

View File

@ -28,6 +28,7 @@ import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
@ -98,14 +99,15 @@ public class TransactionBroker extends BrokerFilter {
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
try {
beginTransaction(context, xid);
Transaction transaction = getTransaction(context, xid, false);
XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
for (int i = 0; i < addedMessages.length; i++) {
kickDestinationOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
}
for (int i = 0; i < aks.length; i++) {
kickDestinationOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
}
transaction.setState(Transaction.PREPARED_STATE);
registerMBean(transaction);
LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
} catch (Throwable e) {
throw new WrappedException(e);
@ -119,8 +121,15 @@ public class TransactionBroker extends BrokerFilter {
next.start();
}
private void kickDestinationOnCompletion(ConnectionContext context, Transaction transaction,
ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
private void registerMBean(XATransaction transaction) {
if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
managedRegionBroker.registerRecoveredTransactionMBean(transaction);
}
}
private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
Destination destination = addDestination(context, amqDestination, false);
registerSync(destination, transaction, ack);
}

View File

@ -54,7 +54,6 @@ import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@ -67,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.XATransaction;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
@ -603,6 +603,45 @@ public class ManagedRegionBroker extends RegionBroker {
return objectName;
}
protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
+ "," + "Type=RecoveredXaTransaction"
+ "," + "Xid="
+ JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
return objectName;
}
public void registerRecoveredTransactionMBean(XATransaction transaction) {
try {
ObjectName objectName = createObjectName(transaction);
if (!registeredMBeans.contains(objectName)) {
RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
AnnotatedMBean.registerMBean(managementContext, view, objectName);
registeredMBeans.add(objectName);
}
} catch (Exception e) {
LOG.warn("Failed to register prepared transaction MBean: " + transaction);
LOG.debug("Failure reason: " + e, e);
}
}
public void unregister(XATransaction transaction) {
try {
ObjectName objectName = createObjectName(transaction);
if (registeredMBeans.remove(objectName)) {
try {
managementContext.unregisterMBean(objectName);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + objectName);
LOG.debug("Failure reason: " + e, e);
}
}
} catch (Exception e) {
LOG.warn("Failed to create object name to unregister " + transaction, e);
}
}
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transaction.XATransaction;
public class RecoveredXATransactionView implements RecoveredXATransactionViewMBean {
private final XATransaction transaction;
public RecoveredXATransactionView(final ManagedRegionBroker managedRegionBroker, final XATransaction transaction) {
this.transaction = transaction;
transaction.addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
managedRegionBroker.unregister(transaction);
}
@Override
public void afterRollback() throws Exception {
managedRegionBroker.unregister(transaction);
}
});
}
@Override
public int getFormatId() {
return transaction.getXid().getFormatId();
}
@Override
public byte[] getBranchQualifier() {
return transaction.getXid().getBranchQualifier();
}
@Override
public byte[] getGlobalTransactionId() {
return transaction.getXid().getGlobalTransactionId();
}
@Override
public void heuristicCommit() throws Exception {
transaction.commit(false);
}
@Override
public void heuristicRollback() throws Exception {
transaction.rollback();
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
public interface RecoveredXATransactionViewMBean {
@MBeanInfo("The raw xid formatId.")
int getFormatId();
@MBeanInfo("The raw xid branchQualifier.")
byte[] getBranchQualifier();
@MBeanInfo("The raw xid globalTransactionId.")
byte[] getGlobalTransactionId();
@MBeanInfo("force heusistic commit of this transaction")
void heuristicCommit() throws Exception;
@MBeanInfo("force heusistic rollback of this transaction")
void heuristicRollback() throws Exception;
}

View File

@ -215,4 +215,8 @@ public class XATransaction extends Transaction {
public Logger getLog() {
return LOG;
}
public XATransactionId getXid() {
return xid;
}
}

View File

@ -16,10 +16,13 @@
*/
package org.apache.activemq.broker;
import javax.jms.JMSException;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
@ -33,7 +36,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.v5.MessageMarshaller;
import org.apache.activemq.util.JMXSupport;
/**
* Used to simulate the recovery that occurs when a broker shuts down.
@ -42,6 +45,89 @@ import org.apache.activemq.openwire.v5.MessageMarshaller;
*/
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
public void testPreparedJmxView() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
// restart the broker.
restartBroker();
connection = createConnection();
connectionInfo = createConnectionInfo();
connection.send(connectionInfo);
response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
TransactionId first = (TransactionId)dar.getData()[0];
// via jmx, force outcome
for (int i = 0; i < 4; i++) {
RecoveredXATransactionViewMBean mbean = getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
if (i%2==0) {
mbean.heuristicCommit();
} else {
mbean.heuristicRollback();
}
}
// verify all completed
response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
dar = (DataArrayResponse)response;
assertEquals(0, dar.getData().length);
// verify mbeans gone
try {
RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first);
gone.heuristicRollback();
fail("Excepted not found");
} catch (InstanceNotFoundException expectedNotfound) {
}
}
private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
ObjectName objectName = new ObjectName("org.apache.activemq:Type=RecoveredXaTransaction,Xid=" +
JMXSupport.encodeObjectNamePart(xid.toString()) + ",BrokerName=localhost");
RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean) broker.getManagementContext().newProxyInstance(objectName,
RecoveredXATransactionViewMBean.class, true);
return proxy;
}
public void testPreparedTransactionRecoveredOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();