ARTEMIS-2855 Define a new broker plugin to track XA transactions

This commit is contained in:
brusdev 2020-07-28 14:57:31 +02:00 committed by Clebert Suconic
parent 6070074603
commit 18b8df0f09
16 changed files with 352 additions and 31 deletions

View File

@ -1312,7 +1312,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
logger.trace("xarollback into " + tx + " sending tx back as it was suspended");
}
// Put it back
resourceManager.putTransaction(xid, tx);
resourceManager.putTransaction(xid, tx, OpenWireConnection.this);
XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
ex.errorCode = XAException.XAER_PROTO;
throw ex;
@ -1466,7 +1466,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} else {
if (tx.getState() == Transaction.State.SUSPENDED) {
// Put it back
resourceManager.putTransaction(xid, tx);
resourceManager.putTransaction(xid, tx, OpenWireConnection.this);
XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
ex.errorCode = XAException.XAER_PROTO;
throw ex;
@ -1732,11 +1732,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
server.getStorageManager().clearContext();
}
private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception {
return lookupTX(txID, session, false);
}
private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws IllegalStateException {
private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws Exception {
if (txID == null) {
return null;
}
@ -1745,7 +1745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
Transaction transaction;
if (txID.isXATransaction()) {
xid = OpenWireUtil.toXID(txID);
transaction = remove ? server.getResourceManager().removeTransaction(xid) : server.getResourceManager().getTransaction(xid);
transaction = remove ? server.getResourceManager().removeTransaction(xid, this) : server.getResourceManager().getTransaction(xid);
} else {
transaction = remove ? txMap.remove(txID) : txMap.get(txID);
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@ -1303,4 +1304,8 @@ public interface Configuration {
*/
List<FederationConfiguration> getFederationConfigurations();
/**
* @return
*/
List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
}

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
@ -283,6 +284,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
private final List<ActiveMQServerBridgePlugin> brokerBridgePlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerCriticalPlugin> brokerCriticalPlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerFederationPlugin> brokerFederationPlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerResourcePlugin> brokerResourcePlugins = new CopyOnWriteArrayList<>();
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
@ -1540,6 +1542,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (plugin instanceof ActiveMQServerFederationPlugin) {
brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin);
}
if (plugin instanceof ActiveMQServerResourcePlugin) {
brokerResourcePlugins.add((ActiveMQServerResourcePlugin) plugin);
}
}
@Override
@ -1575,6 +1580,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (plugin instanceof ActiveMQServerFederationPlugin) {
brokerFederationPlugins.remove(plugin);
}
if (plugin instanceof ActiveMQServerResourcePlugin) {
brokerResourcePlugins.remove(plugin);
}
}
@Override
@ -1637,6 +1645,11 @@ public class ConfigurationImpl implements Configuration, Serializable {
return federationConfigurations;
}
@Override
public List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins() {
return brokerResourcePlugins;
}
@Override
public File getBrokerInstance() {
if (artemisInstance != null) {

View File

@ -2043,7 +2043,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
Transaction transaction = resourceManager.removeTransaction(xid);
Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.commit(false);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
storageManager.waitOnOperations();
@ -2071,7 +2071,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
Transaction transaction = resourceManager.removeTransaction(xid);
Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.rollback();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
server.getStorageManager().waitOnOperations();

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -260,6 +261,8 @@ public interface ActiveMQServer extends ServiceComponent {
List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins();
List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException;
void callBrokerConnectionPlugins(ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException;
@ -282,6 +285,8 @@ public interface ActiveMQServer extends ServiceComponent {
void callBrokerFederationPlugins(ActiveMQPluginRunnable<ActiveMQServerFederationPlugin> pluginRun) throws ActiveMQException;
void callBrokerResourcePlugins(ActiveMQPluginRunnable<ActiveMQServerResourcePlugin> pluginRun) throws ActiveMQException;
boolean hasBrokerPlugins();
boolean hasBrokerConnectionPlugins();
@ -304,6 +309,8 @@ public interface ActiveMQServer extends ServiceComponent {
boolean hasBrokerFederationPlugins();
boolean hasBrokerResourcePlugins();
void checkQueueCreationLimit(String username) throws Exception;
ServerSession createSession(String name,

View File

@ -2104,4 +2104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224105, value = "Connecting to cluster failed")
void failedConnectingToCluster(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224106, value = "failed to remove transaction, xid:{0}", format = Message.Format.MESSAGE_FORMAT)
void errorRemovingTX(@Cause Exception e, Xid xid);
}

View File

@ -165,6 +165,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
@ -2378,6 +2379,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return configuration.getBrokerFederationPlugins();
}
@Override
public List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins() {
return configuration.getBrokerResourcePlugins();
}
@Override
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException {
callBrokerPlugins(getBrokerPlugins(), pluginRun);
@ -2433,6 +2439,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callBrokerPlugins(getBrokerFederationPlugins(), pluginRun);
}
@Override
public void callBrokerResourcePlugins(final ActiveMQPluginRunnable<ActiveMQServerResourcePlugin> pluginRun) throws ActiveMQException {
callBrokerPlugins(getBrokerResourcePlugins(), pluginRun);
}
private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
if (pluginRun != null) {
for (P plugin : plugins) {
@ -2505,6 +2516,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return !getBrokerFederationPlugins().isEmpty();
}
@Override
public boolean hasBrokerResourcePlugins() {
return !getBrokerResourcePlugins().isEmpty();
}
@Override
public ExecutorFactory getExecutorFactory() {
return executorFactory;
@ -2879,7 +2895,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
pagingManager = createPagingManager();
resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
resourceManager = new ResourceManagerImpl(this, (int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
/**
* If there is no plugin configured we don't want to instantiate a MetricsManager. This keeps the dependency

View File

@ -335,7 +335,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
tx.setState(Transaction.State.PREPARED);
resourceManager.putTransaction(xid, tx);
resourceManager.putTransaction(xid, tx, null);
}
/**

View File

@ -401,7 +401,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Transaction txToRollback = tx;
if (txToRollback != null) {
if (txToRollback.getXid() != null) {
resourceManager.removeTransaction(txToRollback.getXid());
resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
}
txToRollback.rollbackIfPossible();
}
@ -410,7 +410,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (txToRollback != null) {
if (txToRollback.getXid() != null) {
resourceManager.removeTransaction(txToRollback.getXid());
resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
}
txToRollback.rollbackIfPossible();
}
@ -1352,7 +1352,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
} else {
Transaction theTx = resourceManager.removeTransaction(xid);
Transaction theTx = resourceManager.removeTransaction(xid, remotingConnection);
if (logger.isTraceEnabled()) {
logger.trace("XAcommit into " + theTx + ", xid=" + xid);
@ -1375,7 +1375,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} else {
if (theTx.getState() == Transaction.State.SUSPENDED) {
// Put it back
resourceManager.putTransaction(xid, theTx);
resourceManager.putTransaction(xid, theTx, remotingConnection);
throw new ActiveMQXAException(XAException.XAER_PROTO, "Cannot commit transaction, it is suspended " + xid);
} else {
@ -1497,7 +1497,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
} else {
Transaction theTx = resourceManager.removeTransaction(xid);
Transaction theTx = resourceManager.removeTransaction(xid, remotingConnection);
if (logger.isTraceEnabled()) {
logger.trace("xarollback into " + theTx);
}
@ -1532,7 +1532,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
// Put it back
resourceManager.putTransaction(xid, tx);
resourceManager.putTransaction(xid, tx, remotingConnection);
throw new ActiveMQXAException(XAException.XAER_PROTO, "Cannot rollback transaction, it is suspended " + xid);
} else {
@ -1551,7 +1551,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (tx.getState() != Transaction.State.PREPARED) {
// we don't want to rollback anything prepared here
if (tx.getXid() != null) {
resourceManager.removeTransaction(tx.getXid());
resourceManager.removeTransaction(tx.getXid(), remotingConnection);
}
tx.rollback();
}
@ -1566,7 +1566,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
logger.trace("xastart into tx= " + tx);
}
boolean added = resourceManager.putTransaction(xid, tx);
boolean added = resourceManager.putTransaction(xid, tx, remotingConnection);
if (!added) {
final String msg = "Cannot start, there is already a xid " + tx.getXid();
@ -1581,7 +1581,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (theTX == null) {
theTX = newTransaction(xid);
resourceManager.putTransaction(xid, theTX);
resourceManager.putTransaction(xid, theTX, remotingConnection);
}
if (theTX.isEffective()) {

View File

@ -31,5 +31,6 @@ public interface ActiveMQServerPlugin extends
ActiveMQServerMessagePlugin,
ActiveMQServerBridgePlugin,
ActiveMQServerCriticalPlugin,
ActiveMQServerFederationPlugin {
ActiveMQServerFederationPlugin,
ActiveMQServerResourcePlugin {
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
/**
*
*/
public interface ActiveMQServerResourcePlugin extends ActiveMQServerBasePlugin {
/**
* Before a transaction is put
*
* @param xid
* @param tx
* @param remotingConnection
* @throws ActiveMQException
*/
default void beforePutTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
}
/**
* After a transaction is put
*
* @param xid
* @param tx
* @param remotingConnection
* @throws ActiveMQException
*/
default void afterPutTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
}
/**
* Before a transaction is removed
*
* @param xid
* @param remotingConnection
* @throws ActiveMQException
*/
default void beforeRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
}
/**
* After a transaction is removed
*
* @param xid
* @param remotingConnection
* @throws ActiveMQException
*/
default void afterRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
}
}

View File

@ -20,15 +20,17 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public interface ResourceManager extends ActiveMQComponent {
boolean putTransaction(Xid xid, Transaction tx);
boolean putTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException;
Transaction getTransaction(Xid xid);
Transaction removeTransaction(Xid xid);
Transaction removeTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException;
int getTimeoutSeconds();

View File

@ -31,9 +31,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class ResourceManagerImpl implements ResourceManager {
@ -51,9 +54,13 @@ public class ResourceManagerImpl implements ResourceManager {
private final ScheduledExecutorService scheduledThreadPool;
public ResourceManagerImpl(final int defaultTimeoutSeconds,
private final ActiveMQServer server;
public ResourceManagerImpl(final ActiveMQServer server,
final int defaultTimeoutSeconds,
final long txTimeoutScanPeriod,
final ScheduledExecutorService scheduledThreadPool) {
this.server = server;
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
this.txTimeoutScanPeriod = txTimeoutScanPeriod;
this.scheduledThreadPool = scheduledThreadPool;
@ -103,13 +110,33 @@ public class ResourceManagerImpl implements ResourceManager {
}
@Override
public boolean putTransaction(final Xid xid, final Transaction tx) {
return transactions.putIfAbsent(xid, tx) == null;
public boolean putTransaction(final Xid xid, final Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
if (server.hasBrokerResourcePlugins()) {
server.callBrokerResourcePlugins(plugin -> plugin.beforePutTransaction(xid, tx, remotingConnection));
}
boolean result = transactions.putIfAbsent(xid, tx) == null;
if (server.hasBrokerResourcePlugins()) {
server.callBrokerResourcePlugins(plugin -> plugin.afterPutTransaction(xid, tx, remotingConnection));
}
return result;
}
@Override
public Transaction removeTransaction(final Xid xid) {
return transactions.remove(xid);
public Transaction removeTransaction(final Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
if (server.hasBrokerResourcePlugins()) {
server.callBrokerResourcePlugins(plugin -> plugin.beforeRemoveTransaction(xid, remotingConnection));
}
Transaction transaction = transactions.remove(xid);
if (server.hasBrokerResourcePlugins()) {
server.callBrokerResourcePlugins(plugin -> plugin.afterRemoveTransaction(xid, remotingConnection));
}
return transaction;
}
@Override
@ -209,7 +236,12 @@ public class ResourceManagerImpl implements ResourceManager {
for (Transaction tx : transactions.values()) {
if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {
Transaction removedTX = removeTransaction(tx.getXid());
Transaction removedTX = null;
try {
removedTX = removeTransaction(tx.getXid(), null);
} catch (ActiveMQException e) {
ActiveMQServerLogger.LOGGER.errorRemovingTX(e, tx.getXid());
}
if (removedTX != null) {
ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid());
timedoutTransactions.add(removedTX);

View File

@ -320,7 +320,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
tx = server.getResourceManager().removeTransaction(xid);
tx = server.getResourceManager().removeTransaction(xid, null);
assertNotNull(tx);

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ResourceBrokerPluginTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(ResourceBrokerPluginTest.class);
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
private ActiveMQServer server;
private ClientSession clientSession;
private ClientSessionFactory sessionFactory;
private Configuration configuration;
private ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
addressSettings.clear();
configuration = createDefaultNettyConfig();
server = createServer(true, configuration, -1, -1, addressSettings);
// start the server
server.start();
locator = createNettyNonHALocator();
sessionFactory = createSessionFactory(locator);
clientSession = addClientSession(sessionFactory.createSession(true, false, false));
}
@Test
public void testXATransaction() throws Exception {
final CountDownLatch latch = new CountDownLatch(4);
final Xid clientXid = new XidImpl("XA_TEST".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
server.registerBrokerPlugin(new ActiveMQServerResourcePlugin() {
@Override
public void beforePutTransaction(Xid xid,
Transaction tx,
RemotingConnection remotingConnection) throws ActiveMQException {
Assert.assertEquals(clientXid, xid);
latch.countDown();
}
@Override
public void afterPutTransaction(Xid xid,
Transaction tx,
RemotingConnection remotingConnection) throws ActiveMQException {
Assert.assertEquals(clientXid, xid);
latch.countDown();
}
@Override
public void beforeRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
Assert.assertEquals(clientXid, xid);
latch.countDown();
}
@Override
public void afterRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
Assert.assertEquals(clientXid, xid);
latch.countDown();
}
});
clientSession.start(clientXid, XAResource.TMNOFLAGS);
clientSession.end(clientXid, XAResource.TMSUCCESS);
clientSession.commit(clientXid, true);
Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
}
@Test
public void testXAClientsMisconfiguration() throws Exception {
// https://github.com/jbosstm/narayana/blob/5.10.5.Final/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/FormatConstants.java#L30
final int JTA_FORMAT_ID = 131077;
final CountDownLatch latch = new CountDownLatch(1);
ClientSessionFactory sessionFactoryEx = createSessionFactory(locator);
ClientSession clientSessionEx = sessionFactoryEx.createSession(true, false, false);
server.registerBrokerPlugin(new ActiveMQServerResourcePlugin() {
private final ConcurrentMap<String, String> clients = new ConcurrentHashMap<>();
@Override
public void afterPutTransaction(Xid xid,
Transaction tx,
RemotingConnection remotingConnection) throws ActiveMQException {
if (xid.getFormatId() == JTA_FORMAT_ID) {
// https://github.com/jbosstm/narayana/blob/5.10.5.Final/ArjunaJTA/jta/classes/com/arjuna/ats/jta/xa/XATxConverter.java#L188
String nodeName = new String(Arrays.copyOfRange(xid.getGlobalTransactionId(),28, xid.getGlobalTransactionId().length), StandardCharsets.UTF_8);
String clientAddress = clients.putIfAbsent(nodeName, remotingConnection.getRemoteAddress());
if (clientAddress != null && !clientAddress.equals(remotingConnection.getRemoteAddress())) {
latch.countDown();
logger.warn("Possible XA client misconfiguration. Two addresses with the same node name " +
nodeName + ": " + clientAddress + "/" + remotingConnection.getRemoteAddress());
}
}
}
});
Xid xid = new XidImpl("XA_TEST".getBytes(), JTA_FORMAT_ID, UUIDGenerator.getInstance().generateStringUUID().getBytes());
clientSession.start(xid, XAResource.TMNOFLAGS);
byte[] getGlobalTransactionIdEx = xid.getGlobalTransactionId().clone();
getGlobalTransactionIdEx[0] = (byte)(getGlobalTransactionIdEx[0] + 1);
Xid xidEx = new XidImpl(xid.getBranchQualifier(), JTA_FORMAT_ID, getGlobalTransactionIdEx);
clientSessionEx.start(xidEx, XAResource.TMNOFLAGS);
Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
}
}

View File

@ -102,7 +102,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>();
FakePagingManager pagingManager = new FakePagingManager();
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
Assert.assertEquals(0, mapDups.size());
@ -118,7 +118,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
Assert.assertEquals(1, mapDups.size());
@ -141,7 +141,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
Assert.assertEquals(1, mapDups.size());