ARTEMIS-302 more changes around XA reliability (resilience on failures)

This commit is contained in:
Clebert Suconic 2015-12-14 20:11:53 -05:00
parent a2c8e6bc3b
commit af1f79bff5
22 changed files with 1387 additions and 353 deletions

View File

@ -539,8 +539,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
rollback(false);
}
@Override
public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException {
public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException
{
rollback(isLastMessageAsDelivered, true);
}
public void rollback(final boolean isLastMessageAsDelivered, final boolean waitConsumers) throws ActiveMQException
{
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")");
}
@ -559,7 +564,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : cloneConsumers()) {
consumer.clear(true);
consumer.clear(waitConsumers);
}
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
@ -1173,7 +1178,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
try {
if (rollbackOnly) {
try {
rollback();
rollback(false, false);
}
catch (Throwable ignored) {
ActiveMQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored);
@ -1252,6 +1257,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return sessionContext.configureTransactionTimeout(seconds);
}
catch (Throwable t) {
markRollbackOnly(); // The TM will ignore any errors from here, if things are this screwed up we mark rollbackonly
// This could occur if the TM interrupts the thread
XAException xaException = new XAException(XAException.XAER_RMFAIL);
xaException.initCause(t);

View File

@ -74,20 +74,20 @@ public interface ActiveMQRALogger extends BasicLogger {
void awaitingJMSServerCreation();
@LogMessage(level = Logger.Level.INFO)
@Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT)
void rebalancingConnections();
@Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections on even {0}.", format = Message.Format.MESSAGE_FORMAT)
void rebalancingConnections(String event);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT)
void problemResettingXASession();
void problemResettingXASession(@Cause Throwable t);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 152002, value = "Unable to roll local transaction back", format = Message.Format.MESSAGE_FORMAT)
void unableToRollbackTX();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 152003, value = "unable to reset session after failure", format = Message.Format.MESSAGE_FORMAT)
void unableToResetSession();
@Message(id = 152003, value = "unable to reset session after failure, we will place the MDB Inflow now in setup mode for activation={0}" , format = Message.Format.MESSAGE_FORMAT)
void unableToResetSession(String spec, @Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 152004, value = "Handling JMS exception failure", format = Message.Format.MESSAGE_FORMAT)

View File

@ -811,7 +811,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
private void createCF() {
if (connectionFactory == null) {
connectionFactory = ra.createActiveMQConnectionFactory(mcf.getProperties());
connectionFactory = ra.getConnectionFactory(mcf.getProperties());
}
}

View File

@ -21,8 +21,8 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ActiveMQXAResource;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
/**
* ActiveMQXAResource.
@ -76,13 +76,18 @@ public class ActiveMQRAXAResource implements ActiveMQXAResource {
ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource;
try {
//this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this
sessionInternal.resetIfNeeded();
}
catch (ActiveMQException e) {
ActiveMQRALogger.LOGGER.problemResettingXASession();
}
try {
try {
//this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this
sessionInternal.resetIfNeeded();
}
catch (ActiveMQException e) {
ActiveMQRALogger.LOGGER.problemResettingXASession(e);
XAException xaException = new XAException(XAException.XAER_RMFAIL);
xaException.initCause(e);
throw xaException;
}
xaResource.start(xid, flags);
}
finally {

View File

@ -29,11 +29,12 @@ import javax.transaction.xa.XAResource;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
@ -141,7 +142,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
raProperties = new ActiveMQRAProperties();
configured = new AtomicBoolean(false);
activations = new ConcurrentHashMap<>();
activations = Collections.synchronizedMap(new IdentityHashMap<ActivationSpec, ActiveMQActivation>());
recoveryManager = new RecoveryManager();
}
@ -1570,7 +1571,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
*/
protected void setup() throws ActiveMQException {
raProperties.init();
defaultActiveMQConnectionFactory = createActiveMQConnectionFactory(raProperties);
defaultActiveMQConnectionFactory = newConnectionFactory(raProperties);
recoveryActiveMQConnectionFactory = createRecoveryActiveMQConnectionFactory(raProperties);
Map<String, String> recoveryConfProps = new HashMap<>();
@ -1623,112 +1624,12 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
raProperties.setJgroupsChannelRefName(jgroupsChannelRefName);
}
public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
public synchronized ActiveMQConnectionFactory getConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf;
boolean known = false;
if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
if (ha == null) {
ha = ActiveMQClient.DEFAULT_IS_HA;
}
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
BroadcastEndpointFactory endpointFactory = null;
if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
}
else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
if (initialTimeout == null) {
initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
}
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
else if (connectorClassName != null) {
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
List<Map<String, Object>> connectionParams;
if (overrideProperties.getParsedConnectorClassNames() != null) {
connectionParams = overrideProperties.getParsedConnectionParameters();
}
else {
connectionParams = raProperties.getParsedConnectionParameters();
}
for (int i = 0; i < connectorClassName.size(); i++) {
TransportConfiguration tc;
if (connectionParams == null || i >= connectionParams.size()) {
tc = new TransportConfiguration(connectorClassName.get(i));
ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
}
else {
tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
}
transportConfigurations[i] = tc;
}
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
Arrays.toString(transportConfigurations) + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
}
}
else {
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
}
setParams(cf, overrideProperties);
cf = newConnectionFactory(overrideProperties);
knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
}
else {
@ -1740,12 +1641,119 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
if (known && cf.getServerLocator().isClosed()) {
knownConnectionFactories.remove(overrideProperties);
cf = createActiveMQConnectionFactory(overrideProperties);
cf = newConnectionFactory(overrideProperties);
knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
}
return cf;
}
public ActiveMQConnectionFactory newConnectionFactory(ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf;
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
if (ha == null) {
ha = ActiveMQClient.DEFAULT_IS_HA;
}
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
BroadcastEndpointFactory endpointFactory = null;
if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
}
else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
if (initialTimeout == null) {
initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
}
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
else if (connectorClassName != null) {
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
List<Map<String, Object>> connectionParams;
if (overrideProperties.getParsedConnectorClassNames() != null) {
connectionParams = overrideProperties.getParsedConnectionParameters();
}
else {
connectionParams = raProperties.getParsedConnectionParameters();
}
for (int i = 0; i < connectorClassName.size(); i++) {
TransportConfiguration tc;
if (connectionParams == null || i >= connectionParams.size()) {
tc = new TransportConfiguration(connectorClassName.get(i));
ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
}
else {
tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
}
transportConfigurations[i] = tc;
}
if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
Arrays.toString(transportConfigurations) + " with ha=" + ha);
}
if (ha) {
cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
}
else {
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
}
}
else {
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
}
setParams(cf, overrideProperties);
return cf;
}
public ActiveMQConnectionFactory createRecoveryActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf;
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();

View File

@ -48,7 +48,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.ra.ActiveMQRABundle;
@ -420,12 +419,19 @@ public class ActiveMQActivation {
// nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}
if (threadTearDown.isAlive()) {
if (factory != null) {
// This will interrupt any threads waiting on reconnect
if (factory != null) {
try {
// closing the factory will help making sure pending threads are closed
factory.close();
factory = null;
}
catch (Throwable e) {
ActiveMQRALogger.LOGGER.warn(e);
}
factory = null;
}
if (threadTearDown.isAlive()) {
threadTearDown.interrupt();
try {
@ -440,11 +446,6 @@ public class ActiveMQActivation {
}
}
if (spec.isHasBeenUpdated() && factory != null) {
ra.closeConnectionFactory(spec);
factory = null;
}
nodes.clear();
lastReceived = false;
@ -465,23 +466,11 @@ public class ActiveMQActivation {
factory = (ActiveMQConnectionFactory) fac;
}
else {
ActiveMQRAConnectionFactory raFact = (ActiveMQRAConnectionFactory) fac;
if (spec.isHasBeenUpdated()) {
factory = raFact.getResourceAdapter().createActiveMQConnectionFactory(spec);
}
else {
factory = raFact.getDefaultFactory();
if (factory != ra.getDefaultActiveMQConnectionFactory()) {
ActiveMQRALogger.LOGGER.warnDifferentConnectionfactory();
}
}
factory = ra.newConnectionFactory(spec);
}
}
else if (spec.isHasBeenUpdated()) {
factory = ra.createActiveMQConnectionFactory(spec);
}
else {
factory = ra.getDefaultActiveMQConnectionFactory();
factory = ra.newConnectionFactory(spec);
}
}
@ -627,9 +616,18 @@ public class ActiveMQActivation {
return buffer.toString();
}
public void rebalance() {
ActiveMQRALogger.LOGGER.rebalancingConnections();
reconnect(null);
public void startReconnectThread(final String threadName) {
if (trace) {
ActiveMQRALogger.LOGGER.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this);
}
Runnable runnable = new Runnable() {
@Override
public void run() {
reconnect(null);
}
};
Thread t = new Thread(runnable, threadName);
t.start();
}
/**
@ -638,6 +636,9 @@ public class ActiveMQActivation {
* @param failure if reconnecting in the event of a failure
*/
public void reconnect(Throwable failure) {
if (trace) {
ActiveMQRALogger.LOGGER.trace("reconnecting activation " + this);
}
if (failure != null) {
if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
@ -728,6 +729,7 @@ public class ActiveMQActivation {
}
private class RebalancingListener implements ClusterTopologyListener {
@Override
public void nodeUP(TopologyMember member, boolean last) {
boolean newNode = false;
@ -741,14 +743,8 @@ public class ActiveMQActivation {
}
if (lastReceived && newNode) {
Runnable runnable = new Runnable() {
@Override
public void run() {
rebalance();
}
};
Thread t = new Thread(runnable, "NodeUP Connection Rebalancer");
t.start();
ActiveMQRALogger.LOGGER.rebalancingConnections("nodeUp " + member.toString());
startReconnectThread("NodeUP Connection Rebalancer");
}
else if (last) {
lastReceived = true;
@ -759,14 +755,8 @@ public class ActiveMQActivation {
public void nodeDown(long eventUID, String nodeID) {
if (nodes.remove(nodeID)) {
removedNodes.put(nodeID, eventUID);
Runnable runnable = new Runnable() {
@Override
public void run() {
rebalance();
}
};
Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer");
t.start();
ActiveMQRALogger.LOGGER.rebalancingConnections("nodeDown " + nodeID);
startReconnectThread("NodeDOWN Connection Rebalancer");
}
}
}

View File

@ -386,7 +386,8 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
session.resetIfNeeded();
}
catch (ActiveMQException e) {
ActiveMQRALogger.LOGGER.unableToResetSession();
ActiveMQRALogger.LOGGER.unableToResetSession(activation.toString(), e);
activation.startReconnectThread("Reset MessageHandler after Failure Thread");
}
}

View File

@ -27,6 +27,7 @@ import java.security.MessageDigest;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -2924,7 +2925,7 @@ public class JournalStorageManager implements StorageManager {
@Override
public String toString() {
return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "(" + new Date(scheduledDeliveryTime) + ")]";
}
private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {

View File

@ -28,11 +28,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
@ -218,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return bytes;
}
// for tests
public ClientSessionFactory getSessionFactory() {
return csf;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages()
*/
@ -905,8 +912,24 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
scheduleRetryConnect();
}
}
catch (ActiveMQInterruptedException e) {
ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this);
}
catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this);
if (csf != null) {
try {
csf.close();
csf = null;
}
catch (Throwable ignored) {
}
}
fail(false);
scheduleRetryConnect();
}
}
}

View File

@ -285,7 +285,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// If the consumer is stopped then we don't accept the message, it
// should go back into the
// queue for delivery later.
if (!started || transferring || !callback.isWritable(this)) {
// TCP-flow control has to be done first than everything else otherwise we may lose notifications
if (!callback.isWritable(this) || !started || transferring ) {
return HandleStatus.BUSY;
}

View File

@ -1047,7 +1047,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString());
try {
if (!tx.isEffective()) {
if (tx.getState() != Transaction.State.PREPARED) {
// we don't want to rollback anything prepared here
if (tx.getXid() != null) {
resourceManager.removeTransaction(tx.getXid());
@ -1085,7 +1085,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
if (theTX.isEffective()) {
ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared");
ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it " + theTX.getState());
tx = null;
}
else {
@ -1568,9 +1568,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (theTx.getState() == State.ROLLEDBACK) {
Transaction newTX = newTransaction();
cancelAndRollback(clientFailed, newTX, wasStarted, toCancel);
throw new IllegalStateException("Transaction has already been rolled back");
}
cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
else {
cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
}
}
private void cancelAndRollback(boolean clientFailed,

View File

@ -22,6 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -32,6 +33,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
public class TransactionImpl implements Transaction {
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
private List<TransactionOperation> operations;
private static final int INITIAL_NUM_PROPERTIES = 10;
@ -105,8 +108,7 @@ public class TransactionImpl implements Transaction {
@Override
public boolean isEffective() {
return state == State.PREPARED || state == State.COMMITTED;
return state == State.PREPARED || state == State.COMMITTED || state == State.ROLLEDBACK;
}
@Override
@ -141,32 +143,43 @@ public class TransactionImpl implements Transaction {
@Override
public boolean hasTimedOut(final long currentTime, final int defaultTimeout) {
if (timeoutSeconds == -1) {
return getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
}
else {
return getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
synchronized (timeoutLock) {
boolean timedout;
if (timeoutSeconds == -1) {
timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
}
else {
timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
}
if (timedout) {
markAsRollbackOnly(new ActiveMQException("TX Timeout"));
}
return timedout;
}
}
@Override
public void prepare() throws Exception {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::" + this);
}
storageManager.readLock();
try {
synchronized (timeoutLock) {
if (isEffective()) {
ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call");
ActiveMQServerLogger.LOGGER.debug("TransactionImpl::prepare::" + this + " is being ignored");
return;
}
if (state == State.ROLLBACK_ONLY) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
}
internalRollback();
if (exception != null) {
// this TX will never be rolled back,
// so we reset it now
beforeRollback();
afterRollback();
if (operations != null) {
operations.clear();
}
throw exception;
}
else {
@ -216,14 +229,17 @@ public class TransactionImpl implements Transaction {
@Override
public void commit(final boolean onePhase) throws Exception {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("TransactionImpl::commit::" + this);
}
synchronized (timeoutLock) {
if (state == State.COMMITTED) {
// I don't think this could happen, but just in case
ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call");
ActiveMQServerLogger.LOGGER.debug("TransactionImpl::commit::" + this + " is being ignored");
return;
}
if (state == State.ROLLBACK_ONLY) {
rollback();
internalRollback();
if (exception != null) {
throw exception;
@ -236,12 +252,12 @@ public class TransactionImpl implements Transaction {
if (xid != null) {
if (onePhase && state != State.ACTIVE || !onePhase && state != State.PREPARED) {
throw new IllegalStateException("Transaction is in invalid state " + state);
throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
}
}
else {
if (state != State.ACTIVE) {
throw new IllegalStateException("Transaction is in invalid state " + state);
throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
}
}
@ -249,6 +265,11 @@ public class TransactionImpl implements Transaction {
doCommit();
// We want to make sure that nothing else gets done after the commit is issued
// this will eliminate any possibility or races
final List<TransactionOperation> operationsToComplete = this.operations;
this.operations = null;
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
@ -263,7 +284,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
afterCommit();
afterCommit(operationsToComplete);
}
});
@ -285,44 +306,65 @@ public class TransactionImpl implements Transaction {
@Override
public void rollback() throws Exception {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("TransactionImpl::rollback::" + this);
}
synchronized (timeoutLock) {
if (state == State.ROLLEDBACK) {
// I don't think this could happen, but just in case
ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace"));
ActiveMQServerLogger.LOGGER.debug("TransactionImpl::rollback::" + this + " is being ignored");
return;
}
if (xid != null) {
if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
throw new IllegalStateException("Transaction is in invalid state " + state);
throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
}
}
else {
if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
throw new IllegalStateException("Transaction is in invalid state " + state);
throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
}
}
beforeRollback();
internalRollback();
}
}
private void internalRollback() throws Exception {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("TransactionImpl::internalRollback " + this);
}
beforeRollback();
try {
doRollback();
state = State.ROLLEDBACK;
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
storageManager.afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
}
@Override
public void done() {
afterRollback();
}
});
}
catch (IllegalStateException e) {
// Something happened before and the TX didn't make to the Journal / Storage
// We will like to execute afterRollback and clear anything pending
ActiveMQServerLogger.LOGGER.warn(e);
}
// We want to make sure that nothing else gets done after the commit is issued
// this will eliminate any possibility or races
final List<TransactionOperation> operationsToComplete = this.operations;
this.operations = null;
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
storageManager.afterCompleteOperations(new IOCallback() {
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
}
public void done() {
afterRollback(operationsToComplete);
}
});
}
@Override
@ -361,10 +403,14 @@ public class TransactionImpl implements Transaction {
}
@Override
public void markAsRollbackOnly(final ActiveMQException exception1) {
public void markAsRollbackOnly(final ActiveMQException exception) {
synchronized (timeoutLock) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("TransactionImpl::" + this + " marking rollbackOnly for " + exception.toString() + ", msg=" + exception.getMessage());
}
if (isEffective()) {
ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)");
ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared, committed or rolledback!)");
return;
}
@ -373,7 +419,7 @@ public class TransactionImpl implements Transaction {
}
state = State.ROLLBACK_ONLY;
this.exception = exception1;
this.exception = exception;
}
}
@ -434,19 +480,23 @@ public class TransactionImpl implements Transaction {
}
}
private synchronized void afterCommit() {
if (operations != null) {
for (TransactionOperation operation : operations) {
private synchronized void afterCommit(List<TransactionOperation> oeprationsToComplete) {
if (oeprationsToComplete != null) {
for (TransactionOperation operation : oeprationsToComplete) {
operation.afterCommit(this);
}
// Help out GC here
oeprationsToComplete.clear();
}
}
private synchronized void afterRollback() {
if (operations != null) {
for (TransactionOperation operation : operations) {
private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete) {
if (oeprationsToComplete != null) {
for (TransactionOperation operation : oeprationsToComplete) {
operation.afterRollback(this);
}
// Help out GC here
oeprationsToComplete.clear();
}
}

View File

@ -0,0 +1,673 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class TransactionImplTest extends ActiveMQTestBase {
@Test
public void testTimeoutAndThenCommitWithARollback() throws Exception {
TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10);
Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10));
final AtomicInteger commit = new AtomicInteger(0);
final AtomicInteger rollback = new AtomicInteger(0);
tx.addOperation(new TransactionOperation() {
@Override
public void beforePrepare(Transaction tx) throws Exception {
}
@Override
public void afterPrepare(Transaction tx) {
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
}
@Override
public void afterCommit(Transaction tx) {
System.out.println("commit...");
commit.incrementAndGet();
}
@Override
public void beforeRollback(Transaction tx) throws Exception {
}
@Override
public void afterRollback(Transaction tx) {
System.out.println("rollback...");
rollback.incrementAndGet();
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
@Override
public List<MessageReference> getListOnConsumer(long consumerID) {
return null;
}
});
for (int i = 0; i < 2; i++) {
try {
tx.commit();
Assert.fail("Exception expected!");
}
catch (ActiveMQException expected) {
}
}
// it should just be ignored!
tx.rollback();
Assert.assertEquals(0, commit.get());
Assert.assertEquals(1, rollback.get());
}
@Test
public void testTimeoutThenRollbackWithRollback() throws Exception {
TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10);
Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10));
final AtomicInteger commit = new AtomicInteger(0);
final AtomicInteger rollback = new AtomicInteger(0);
tx.addOperation(new TransactionOperation() {
@Override
public void beforePrepare(Transaction tx) throws Exception {
}
@Override
public void afterPrepare(Transaction tx) {
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
}
@Override
public void afterCommit(Transaction tx) {
System.out.println("commit...");
commit.incrementAndGet();
}
@Override
public void beforeRollback(Transaction tx) throws Exception {
}
@Override
public void afterRollback(Transaction tx) {
System.out.println("rollback...");
rollback.incrementAndGet();
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
@Override
public List<MessageReference> getListOnConsumer(long consumerID) {
return null;
}
});
tx.rollback();
// This is a case where another failure was detected (In parallel with the TX timeout for instance)
tx.markAsRollbackOnly(new ActiveMQException("rollback only again"));
tx.rollback();
Assert.assertEquals(0, commit.get());
Assert.assertEquals(1, rollback.get());
}
class FakeSM implements StorageManager {
@Override
public OperationContext getContext() {
return null;
}
@Override
public void lineUpContext() {
}
@Override
public OperationContext newContext(Executor executor) {
return null;
}
@Override
public OperationContext newSingleThreadContext() {
return null;
}
@Override
public void setContext(OperationContext context) {
}
@Override
public void stop(boolean ioCriticalError) throws Exception {
}
@Override
public void pageClosed(SimpleString storeName, int pageNumber) {
}
@Override
public void pageDeleted(SimpleString storeName, int pageNumber) {
}
@Override
public void pageWrite(PagedMessage message, int pageNumber) {
}
@Override
public void afterCompleteOperations(IOCallback run) {
run.done();
}
@Override
public boolean waitOnOperations(long timeout) throws Exception {
return false;
}
@Override
public void waitOnOperations() throws Exception {
}
@Override
public void beforePageRead() throws Exception {
}
@Override
public void afterPageRead() throws Exception {
}
@Override
public ByteBuffer allocateDirectBuffer(int size) {
return null;
}
@Override
public void freeDirectBuffer(ByteBuffer buffer) {
}
@Override
public void clearContext() {
}
@Override
public void confirmPendingLargeMessageTX(Transaction transaction,
long messageID,
long recordID) throws Exception {
}
@Override
public void confirmPendingLargeMessage(long recordID) throws Exception {
}
@Override
public void storeMessage(ServerMessage message) throws Exception {
}
@Override
public void storeReference(long queueID, long messageID, boolean last) throws Exception {
}
@Override
public void deleteMessage(long messageID) throws Exception {
}
@Override
public void storeAcknowledge(long queueID, long messageID) throws Exception {
}
@Override
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception {
}
@Override
public void updateDeliveryCount(MessageReference ref) throws Exception {
}
@Override
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
}
@Override
public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception {
}
@Override
public void deleteDuplicateID(long recordID) throws Exception {
}
@Override
public void storeMessageTransactional(long txID, ServerMessage message) throws Exception {
}
@Override
public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception {
}
@Override
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception {
}
@Override
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception {
}
@Override
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception {
}
@Override
public void deleteCursorAcknowledge(long ackID) throws Exception {
}
@Override
public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception {
}
@Override
public void deletePageComplete(long ackID) throws Exception {
}
@Override
public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception {
}
@Override
public void storeDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) throws Exception {
}
@Override
public void updateDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) throws Exception {
}
@Override
public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception {
}
@Override
public LargeServerMessage createLargeMessage() {
return null;
}
@Override
public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception {
return null;
}
@Override
public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
return null;
}
@Override
public void prepare(long txID, Xid xid) throws Exception {
}
@Override
public void commit(long txID) throws Exception {
}
@Override
public void commit(long txID, boolean lineUpContext) throws Exception {
}
@Override
public void rollback(long txID) throws Exception {
}
@Override
public void rollbackBindings(long txID) throws Exception {
}
@Override
public void commitBindings(long txID) throws Exception {
}
@Override
public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception {
}
@Override
public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception {
}
@Override
public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception {
}
@Override
public void deletePageTransactional(long recordID) throws Exception {
}
@Override
public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
return null;
}
@Override
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception {
return 0;
}
@Override
public void deleteHeuristicCompletion(long id) throws Exception {
}
@Override
public void addQueueBinding(long tx, Binding binding) throws Exception {
}
@Override
public void deleteQueueBinding(long tx, long queueBindingID) throws Exception {
}
@Override
public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception {
return null;
}
@Override
public void addGrouping(GroupBinding groupBinding) throws Exception {
}
@Override
public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception {
}
@Override
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception {
}
@Override
public void deleteAddressSetting(SimpleString addressMatch) throws Exception {
}
@Override
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception {
return null;
}
@Override
public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception {
}
@Override
public void deleteSecurityRoles(SimpleString addressMatch) throws Exception {
}
@Override
public List<PersistedRoles> recoverPersistedRoles() throws Exception {
return null;
}
@Override
public long storePageCounter(long txID, long queueID, long value) throws Exception {
return 0;
}
@Override
public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
return 0;
}
@Override
public void deleteIncrementRecord(long txID, long recordID) throws Exception {
}
@Override
public void deletePageCounter(long txID, long recordID) throws Exception {
}
@Override
public void deletePendingPageCounter(long txID, long recordID) throws Exception {
}
@Override
public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
return 0;
}
@Override
public long storePageCounterInc(long queueID, int add) throws Exception {
return 0;
}
@Override
public Journal getBindingsJournal() {
return null;
}
@Override
public Journal getMessageJournal() {
return null;
}
@Override
public void startReplication(ReplicationManager replicationManager,
PagingManager pagingManager,
String nodeID,
boolean autoFailBack,
long initialReplicationSyncTimeout) throws Exception {
}
@Override
public boolean addToPage(PagingStore store,
ServerMessage msg,
Transaction tx,
RouteContextList listCtx) throws Exception {
return false;
}
@Override
public void stopReplication() {
}
@Override
public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception {
}
@Override
public void storeID(long journalID, long id) throws Exception {
}
@Override
public void deleteID(long journalD) throws Exception {
}
@Override
public void readLock() {
}
@Override
public void readUnLock() {
}
@Override
public void persistIdGenerator() {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public boolean isStarted() {
return false;
}
@Override
public long generateID() {
return 0;
}
@Override
public long getCurrentID() {
return 0;
}
}
}

View File

@ -0,0 +1,228 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* This will simulate a failure of a failure.
* The bridge could eventually during a race or multiple failures not be able to reconnect because it failed again.
* this should make the bridge to always reconnect itself.
*/
@RunWith(BMUnitRunner.class)
public class ClusteredBridgeReconnectTest extends ClusterTestBase {
static ThreadLocal<Boolean> inConnect = new ThreadLocal<Boolean>();
public static void enterConnect() {
inConnect.set(Boolean.TRUE);
}
public static void exitConnect() {
inConnect.set(null);
}
public static volatile boolean shouldFail = false;
public static void send() {
if (inConnect.get() != null) {
if (shouldFail) {
shouldFail = false;
throw new NullPointerException("just because it's a test...");
}
}
}
@Test
@BMRules(
rules = {@BMRule(
name = "enter",
targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl",
targetMethod = "connect",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.enterConnect();"), @BMRule(
name = "exit",
targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl",
targetMethod = "connect",
targetLocation = "EXIT",
action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.exitConnect();"), @BMRule(
name = "send",
targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl",
targetMethod = "send(org.apache.activemq.artemis.core.protocol.core.Packet)",
targetLocation = "EXIT",
action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.send();")
})
public void testReconnectBridge() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[0].createSession();
session0.start();
session1.start();
ClientProducer producer = session0.createProducer("queues.testaddress");
int NUMBER_OF_MESSAGES = 100;
Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size());
ClusterConnectionImpl connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0];
Assert.assertEquals(1, connection.getRecords().size());
MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session0.createMessage(true);
producer.send(msg);
session0.commit();
if (i == 17) {
shouldFail = true;
bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!"));
}
}
int cons0Count = 0, cons1Count = 0;
while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(1000);
if (msg == null) {
break;
}
cons0Count++;
msg.acknowledge();
session0.commit();
}
while (true) {
ClientMessage msg = consumers[1].getConsumer().receive(1000);
if (msg == null) {
break;
}
cons1Count++;
msg.acknowledge();
session1.commit();
}
Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count);
session0.commit();
session1.commit();
stopServers(0, 1);
}
static CountDownLatch latch;
static CountDownLatch latch2;
static Thread main;
public static void pause(SimpleString clusterName) {
if (clusterName.toString().startsWith("queue0")) {
try {
latch2.countDown();
latch.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void pause2(Notification notification) {
if (notification.getType() == CoreNotificationType.BINDING_REMOVED) {
SimpleString clusterName = notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
boolean inMain = main == Thread.currentThread();
if (clusterName.toString().startsWith("queue0") && !inMain) {
try {
latch2.countDown();
latch.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void restart2() {
latch.countDown();
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
shouldFail = false;
}
@Override
@After
public void tearDown() throws Exception {
closeAllConsumers();
closeAllSessionFactories();
closeAllServerLocatorsFactories();
super.tearDown();
}
public boolean isNetty() {
return true;
}
}

View File

@ -74,6 +74,11 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
latchFlag.setCount(1);
}
@Override
protected boolean usePersistence() {
return true;
}
@Test
@BMRules(
rules = {@BMRule(
@ -84,6 +89,7 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")})
public void testConcurrentCancels() throws Exception {
System.out.println(server.getConfiguration().getJournalLocation().toString());
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings();
settings.setMaxDeliveryAttempts(-1);
@ -184,18 +190,6 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
}
}
});
//
// consumer.close();
//
// threads.add(new Thread("ClientFailing")
// {
// public void run()
// {
// ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession();
// impl.getConnection().fail(new HornetQException("failure"));
// }
// });
//
for (Thread t : threads) {
t.start();
@ -213,47 +207,55 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
}
Connection connection = cf.createConnection();
connection.setClientID("myID");
try {
connection.setClientID("myID");
Thread.sleep(2000); // I am too lazy to call end on all the transactions
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
HashMap<Integer, AtomicInteger> mapCount = new HashMap<>();
Thread.sleep(5000); // I am too lazy to call end on all the transactions
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
HashMap<Integer, AtomicInteger> mapCount = new HashMap<>();
while (true) {
TextMessage message = (TextMessage) consumer.receiveNoWait();
if (message == null) {
break;
while (true) {
TextMessage message = (TextMessage) consumer.receiveNoWait();
if (message == null) {
break;
}
Integer value = message.getIntProperty("i");
AtomicInteger count = mapCount.get(value);
if (count == null) {
count = new AtomicInteger(0);
mapCount.put(message.getIntProperty("i"), count);
}
count.incrementAndGet();
}
Integer value = message.getIntProperty("i");
AtomicInteger count = mapCount.get(value);
if (count == null) {
count = new AtomicInteger(0);
mapCount.put(message.getIntProperty("i"), count);
boolean failed = false;
for (int i = 0; i < numberOfMessages; i++) {
AtomicInteger count = mapCount.get(i);
if (count == null) {
System.out.println("Message " + i + " not received");
failed = true;
}
else if (count.get() > 1) {
System.out.println("Message " + i + " received " + count.get() + " times");
failed = true;
}
}
count.incrementAndGet();
if (failed) {
System.err.println("Failed");
System.exit(-1);
}
Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed);
}
boolean failed = false;
for (int i = 0; i < numberOfMessages; i++) {
AtomicInteger count = mapCount.get(i);
if (count == null) {
System.out.println("Message " + i + " not received");
failed = true;
}
else if (count.get() > 1) {
System.out.println("Message " + i + " received " + count.get() + " times");
failed = true;
}
finally {
connection.close();
}
Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed);
connection.close();
}
}

View File

@ -27,6 +27,7 @@ import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -91,7 +92,7 @@ public class TimeoutXATest extends ActiveMQTestBase {
@BMRule(
name = "afterRollback TX",
targetClass = "org.apache.activemq.artemis.core.transaction.impl.TransactionImpl",
targetMethod = "afterRollback()",
targetMethod = "afterRollback",
targetLocation = "ENTRY",
helper = "org.apache.activemq.artemis.tests.extras.byteman.TimeoutXATest",
action = "afterRollback()")})
@ -166,23 +167,20 @@ public class TimeoutXATest extends ActiveMQTestBase {
Thread.sleep(1000);
removingTXAwait0.countDown();
enteredRollbackLatch.await();
Assert.assertTrue(enteredRollbackLatch.await(10, TimeUnit.SECONDS));
waitingRollbackLatch.countDown();
t.join();
consumer.close();
//
// connction2.start();
//
consumer = session.createConsumer(queue);
for (int i = 0; i < 10; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
// session.commit();
// session.close();
connection.close();
connction2.close();

View File

@ -70,6 +70,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
ServerLocator nettyLocator;
// This thread will keep bugging the handlers.
// if they behave well with XA, the test pass!
final AtomicBoolean running = new AtomicBoolean(true);
private volatile boolean playTXTimeouts = true;
private volatile boolean playServerClosingSession = true;
private volatile boolean playServerClosingConsumer = true;
@ -85,6 +89,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
super.setUp();
createQueue(true, "outQueue");
DummyTMLocator.startTM();
running.set(true);
}
@Override
@ -113,6 +118,11 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
server.getAddressSettingsRepository().addMatch("#", settings);
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
resourceAdapter = qResourceAdapter;
resourceAdapter.setConfirmationWindowSize(-1);
resourceAdapter.setCallTimeout(1000L);
resourceAdapter.setConsumerWindowSize(1024 * 1024);
resourceAdapter.setReconnectAttempts(-1);
resourceAdapter.setRetryInterval(100L);
// qResourceAdapter.setTransactionManagerLocatorClass(DummyTMLocator.class.getName());
// qResourceAdapter.setTransactionManagerLocatorMethod("getTM");
@ -125,17 +135,18 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
final int NUMBER_OF_SESSIONS = 10;
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setMaxSession(NUMBER_OF_SESSIONS);
spec.setTransactionTimeout(1);
spec.setReconnectAttempts(-1);
spec.setConfirmationWindowSize(-1);
spec.setReconnectInterval(1000);
spec.setCallTimeout(1000L);
spec.setMaxSession(NUMBER_OF_SESSIONS);
spec.setSetupAttempts(-1);
spec.setSetupInterval(100);
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setConsumerWindowSize(1024 * 1024);
// Some the routines would be screwed up if using the default one
Assert.assertFalse(spec.isHasBeenUpdated());
TestEndpointFactory endpointFactory = new TestEndpointFactory(true);
qResourceAdapter.endpointActivation(endpointFactory, spec);
@ -146,7 +157,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
final int NUMBER_OF_MESSAGES = 1000;
Thread producer = new Thread() {
@Override
public void run() {
try {
ServerLocator locator = createInVMLocator(0);
@ -155,11 +165,18 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
StringBuffer buffer = new StringBuffer();
for (int b = 0; b < 500; b++) {
buffer.append("ab");
}
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring " + i);
message.getBodyBuffer().writeString(buffer.toString() + i);
message.putIntProperty("i", i);
clientProducer.send(message);
@ -181,12 +198,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
final AtomicBoolean metaDataFailed = new AtomicBoolean(false);
// This thread will keep bugging the handlers.
// if they behave well with XA, the test pass!
final AtomicBoolean running = new AtomicBoolean(true);
Thread buggerThread = new Thread() {
@Override
public void run() {
while (running.get()) {
try {
@ -197,7 +209,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
return;
}
List<ServerSession> serverSessions = lookupServerSessions("resource-adapter");
List<ServerSession> serverSessions = lookupServerSessions("resource-adapter", NUMBER_OF_SESSIONS);
System.err.println("Contains " + serverSessions.size() + " RA sessions");
@ -256,6 +268,13 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
break;
}
if (i == NUMBER_OF_MESSAGES * 0.50) {
// This is to make sure the MDBs will survive a reboot
// and no duplications or message loss will happen because of this
System.err.println("Rebooting the MDBs at least once!");
activation.startReconnectThread("I");
}
if (i == NUMBER_OF_MESSAGES * 0.90) {
System.out.println("Disabled failures at " + i);
playTXTimeouts = false;
@ -266,17 +285,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
System.out.println("Received " + i + " messages");
Assert.assertNotNull(message);
message.acknowledge();
Integer value = message.getIntProperty("i");
AtomicInteger mapCount = new AtomicInteger(1);
mapCount = mapCounter.putIfAbsent(value, mapCount);
if (mapCount != null) {
mapCount.incrementAndGet();
}
doReceiveMessage(message);
if (i % 200 == 0) {
System.out.println("received " + i);
@ -285,6 +294,20 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
}
session.commit();
while (true) {
ClientMessage message = consumer.receiveImmediate();
if (message == null) {
break;
}
System.out.println("Received extra message " + message);
doReceiveMessage(message);
}
session.commit();
Assert.assertNull(consumer.receiveImmediate());
StringWriter writer = new StringWriter();
@ -328,14 +351,42 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
}
private List<ServerSession> lookupServerSessions(String parameter) {
List<ServerSession> serverSessions = new LinkedList<>();
private void doReceiveMessage(ClientMessage message) throws Exception {
Assert.assertNotNull(message);
message.acknowledge();
Integer value = message.getIntProperty("i");
AtomicInteger mapCount = new AtomicInteger(1);
for (ServerSession session : server.getSessions()) {
if (session.getMetaData(parameter) != null) {
serverSessions.add(session);
}
mapCount = mapCounter.putIfAbsent(value, mapCount);
if (mapCount != null) {
mapCount.incrementAndGet();
}
}
private List<ServerSession> lookupServerSessions(String parameter, int numberOfSessions) {
long timeout = System.currentTimeMillis() + 50000;
List<ServerSession> serverSessions = new LinkedList<ServerSession>();
do {
if (!serverSessions.isEmpty()) {
System.err.println("Retry on serverSessions!!! currently with " + serverSessions.size());
serverSessions.clear();
try {
Thread.sleep(100);
}
catch (Exception e) {
break;
}
}
serverSessions.clear();
for (ServerSession session : server.getSessions()) {
if (session.getMetaData(parameter) != null) {
serverSessions.add(session);
}
}
} while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis());
System.err.println("Returning " + serverSessions.size() + " sessions");
return serverSessions;
}
@ -347,7 +398,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
isDeliveryTransacted = deliveryTransacted;
}
@Override
public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
TestEndpoint retEnd = new TestEndpoint();
if (xaResource != null) {
@ -356,7 +406,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
return retEnd;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return isDeliveryTransacted;
}
@ -397,7 +446,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
}
@Override
public void onMessage(Message message) {
Integer value = 0;

View File

@ -680,6 +680,10 @@ public class BridgeReconnectTest extends BridgeTestBase {
}
}
for (int i = 0; i < 100 && queue.getDeliveringCount() != 0; i++) {
Thread.sleep(10);
}
System.out.println("Check.. DeliveringCount: " + queue.getDeliveringCount());
assertEquals("Delivering count of a source queue should be zero on connection failure", 0, queue.getDeliveringCount());

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -185,7 +186,12 @@ public class AsynchronousFailoverTest extends FailoverTestBase {
AsynchronousFailoverTest.log.info("Fail complete");
t.join();
t.join(TimeUnit.SECONDS.toMillis(20));
if (t.isAlive()) {
System.out.println(threadDump("Thread still running from the test"));
t.interrupt();
fail("Test didn't complete successful, thread still running");
}
runnable.checkForExceptions();

View File

@ -16,13 +16,21 @@
*/
package org.apache.activemq.artemis.tests.integration.ra;
import javax.jms.Connection;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -35,16 +43,6 @@ import org.apache.activemq.artemis.tests.unit.ra.MessageEndpointFactory;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.junit.Test;
import javax.jms.Connection;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
public class ResourceAdapterTest extends ActiveMQRATestBase {
@Test
@ -86,29 +84,17 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ra.getDefaultActiveMQConnectionFactory().getServerLocator();
Field f = Class.forName(ServerLocatorImpl.class.getName()).getDeclaredField("factories");
Set<XARecoveryConfig> resources = ra.getRecoveryManager().getResources();
f.setAccessible(true);
Set<ClientSessionFactoryInternal> factories = (Set<ClientSessionFactoryInternal>) f.get(serverLocator);
for (int i = 0; i < 10; i++) {
System.out.println(i);
assertEquals(factories.size(), 0);
activation.start();
assertEquals(factories.size(), 15);
assertEquals(1, resources.size());
activation.stop();
assertEquals(factories.size(), 0);
}
System.out.println("before RA stop => " + factories.size());
ra.stop();
assertEquals(0, resources.size());
System.out.println("after RA stop => " + factories.size());
assertEquals(factories.size(), 0);
locator.close();
}
@ -402,7 +388,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
DiscoveryGroupConfiguration dc = fac.getServerLocator().getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory udpDg = (UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory();
assertEquals(udpDg.getGroupAddress(), "231.6.6.6");
@ -430,7 +416,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDiscoveryPort(1234);
spec.setDiscoveryInitialWaitTimeout(1L);
spec.setDiscoveryRefreshTimeout(1L);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
DiscoveryGroupConfiguration dc = fac.getServerLocator().getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory udpDg = (UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory();
assertEquals(udpDg.getGroupAddress(), "231.6.6.6");
@ -455,7 +441,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
assertTrue(fac.isHA());
@ -477,7 +463,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
assertFalse(fac.isHA());
@ -499,7 +485,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setHA(true);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
assertTrue(fac.isHA());
@ -522,7 +508,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
assertEquals(100, fac.getReconnectAttempts());
@ -544,7 +530,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
assertEquals(-1, fac.getReconnectAttempts());
@ -566,7 +552,7 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setReconnectAttempts(100);
ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec);
ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec);
assertEquals(100, fac.getReconnectAttempts());

View File

@ -381,6 +381,9 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
else {
clientSession.rollback(xid);
}
xids = clientSession.recover(XAResource.TMSTARTRSCAN);
Assert.assertEquals(xids.length, 0);
}
@Test

View File

@ -24,22 +24,22 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.ra.ConnectionFactoryProperties;
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.ConnectionFactoryProperties;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
@ -99,7 +99,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
public void testCreateConnectionFactoryNoOverrides() throws Exception {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setConnectorClassName(InVMConnectorFactory.class.getName());
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(new ConnectionFactoryProperties());
ActiveMQConnectionFactory factory = ra.getConnectionFactory(new ConnectionFactoryProperties());
Assert.assertEquals(factory.getCallTimeout(), ActiveMQClient.DEFAULT_CALL_TIMEOUT);
Assert.assertEquals(factory.getClientFailureCheckPeriod(), ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD);
Assert.assertEquals(factory.getClientID(), null);
@ -211,7 +211,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
connectionFactoryProperties.setThreadPoolMaxSize(17);
connectionFactoryProperties.setTransactionBatchSize(18);
connectionFactoryProperties.setUseGlobalPools(!ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS);
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(connectionFactoryProperties);
ActiveMQConnectionFactory factory = ra.getConnectionFactory(connectionFactoryProperties);
Assert.assertEquals(factory.getCallTimeout(), 1);
Assert.assertEquals(factory.getClientFailureCheckPeriod(), 2);
Assert.assertEquals(factory.getClientID(), "myid");
@ -245,7 +245,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
ArrayList<String> value = new ArrayList<>();
value.add(NettyConnectorFactory.class.getName());
connectionFactoryProperties.setParsedConnectorClassNames(value);
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(connectionFactoryProperties);
ActiveMQConnectionFactory factory = ra.getConnectionFactory(connectionFactoryProperties);
ActiveMQConnectionFactory defaultFactory = ra.getDefaultActiveMQConnectionFactory();
Assert.assertNotSame(factory, defaultFactory);
}
@ -258,7 +258,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
connectionFactoryProperties.setDiscoveryAddress("myhost");
connectionFactoryProperties.setDiscoveryPort(5678);
connectionFactoryProperties.setDiscoveryLocalBindAddress("newAddress");
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(connectionFactoryProperties);
ActiveMQConnectionFactory factory = ra.getConnectionFactory(connectionFactoryProperties);
ActiveMQConnectionFactory defaultFactory = ra.getDefaultActiveMQConnectionFactory();
Assert.assertNotSame(factory, defaultFactory);
DiscoveryGroupConfiguration dc = factory.getServerLocator().getDiscoveryGroupConfiguration();
@ -272,7 +272,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
public void testCreateConnectionFactoryMultipleConnectors() {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setConnectorClassName(NETTY_CONNECTOR_FACTORY + "," + INVM_CONNECTOR_FACTORY + "," + NETTY_CONNECTOR_FACTORY);
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(new ConnectionFactoryProperties());
ActiveMQConnectionFactory factory = ra.getConnectionFactory(new ConnectionFactoryProperties());
TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations();
assertNotNull(configurations);
assertEquals(3, configurations.length);
@ -289,7 +289,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setConnectorClassName(NETTY_CONNECTOR_FACTORY + "," + INVM_CONNECTOR_FACTORY + "," + NETTY_CONNECTOR_FACTORY);
ra.setConnectionParameters("host=host1;port=61616, serverid=0, host=host2;port=61617");
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(new ConnectionFactoryProperties());
ActiveMQConnectionFactory factory = ra.getConnectionFactory(new ConnectionFactoryProperties());
TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations();
assertNotNull(configurations);
assertEquals(3, configurations.length);
@ -316,7 +316,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
value.add(NETTY_CONNECTOR_FACTORY);
value.add(INVM_CONNECTOR_FACTORY);
overrideProperties.setParsedConnectorClassNames(value);
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(overrideProperties);
ActiveMQConnectionFactory factory = ra.getConnectionFactory(overrideProperties);
TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations();
assertNotNull(configurations);
assertEquals(3, configurations.length);
@ -351,7 +351,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
map3.put("serverid", "1");
connectionParameters.add(map3);
overrideProperties.setParsedConnectionParameters(connectionParameters);
ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(overrideProperties);
ActiveMQConnectionFactory factory = ra.getConnectionFactory(overrideProperties);
TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations();
assertNotNull(configurations);
assertEquals(3, configurations.length);
@ -372,7 +372,7 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ConnectionFactoryProperties connectionFactoryProperties = new ConnectionFactoryProperties();
try {
ra.createActiveMQConnectionFactory(connectionFactoryProperties);
ra.getConnectionFactory(connectionFactoryProperties);
Assert.fail("should throw exception");
}
catch (IllegalArgumentException e) {