ActiveMQ6-6 Factor out WildFly XA Recovery
Pulls out WildFly XA Recovery specifics into a different project. Some XA recovery code is still present and is used as integration points for integrating TM XA recovery processes.
This commit is contained in:
parent
d7c7d86d55
commit
8f91af1b5c
|
@ -50,13 +50,14 @@
|
|||
<artifactId>geronimo-ejb_3.0_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.jbossts.jts</groupId>
|
||||
<artifactId>jbossjts-jacorb</artifactId>
|
||||
<optional>true</optional>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
<version>1.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-service-extensions</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -63,9 +64,10 @@ import org.apache.activemq.jms.client.ActiveMQConnection;
|
|||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQMessage;
|
||||
import org.apache.activemq.jms.server.ActiveMQJMSServerBundle;
|
||||
import org.apache.activemq.jms.server.recovery.ActiveMQRegistryBase;
|
||||
import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.service.extensions.ServiceUtils;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.utils.DefaultSensitiveStringCodec;
|
||||
import org.apache.activemq.utils.PasswordMaskingUtil;
|
||||
|
@ -183,7 +185,7 @@ public final class JMSBridgeImpl implements JMSBridge
|
|||
|
||||
private static final int FORWARD_MODE_NONTX = 2;
|
||||
|
||||
private ActiveMQRegistryBase registry;
|
||||
private ActiveMQRegistry registry;
|
||||
|
||||
/*
|
||||
* Constructor for MBean
|
||||
|
@ -2228,7 +2230,15 @@ public final class JMSBridgeImpl implements JMSBridge
|
|||
{
|
||||
try
|
||||
{
|
||||
registry = (ActiveMQRegistryBase) safeInitNewInstance(locatorClasse);
|
||||
ServiceLoader<ActiveMQRegistry> sl = ServiceLoader.load(ActiveMQRegistry.class);
|
||||
if (sl.iterator().hasNext())
|
||||
{
|
||||
registry = sl.iterator().next();
|
||||
}
|
||||
else
|
||||
{
|
||||
registry = ActiveMQRegistryImpl.getInstance();
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.jms.server;
|
||||
|
||||
import org.apache.activemq.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
|
||||
import org.jboss.logging.BasicLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.jboss.logging.annotations.Cause;
|
||||
|
@ -90,11 +89,6 @@ public interface ActiveMQJMSServerLogger extends BasicLogger
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoverConnectionError(@Cause Exception e, ClientSessionFactory csf);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122015, value = "Can not connect to {0} on auto-generated resource recovery",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoverAutoConnectionError(@Cause Throwable e, XARecoveryConfig csf);
|
||||
|
||||
@LogMessage(level = Logger.Level.DEBUG)
|
||||
@Message(id = 122016, value = "Error in XA Recovery" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoveryError(@Cause Exception e);
|
||||
|
@ -104,11 +98,6 @@ public interface ActiveMQJMSServerLogger extends BasicLogger
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void failedToCorrectHost(@Cause Exception e, String name);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122018, value = "Could not start recovery discovery on {0}, we will retry every recovery scan until the server is available",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoveryStartError(XARecoveryConfig e);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 124000, value = "key attribute missing for JMS configuration {0}" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void jmsConfigMissingKey(Node e);
|
||||
|
|
|
@ -1,254 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.activemq.api.core.Pair;
|
||||
import org.apache.activemq.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
|
||||
import org.jboss.tm.XAResourceRecovery;
|
||||
|
||||
/**
|
||||
* <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p>
|
||||
* <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link ActiveMQRecoveryRegistry#register(XARecoveryConfig)}</p>
|
||||
* <p>Later the {@link RecoveryDiscovery} will call {@link ActiveMQRecoveryRegistry#nodeUp(String, Pair, String, String)}
|
||||
* so we will keep a track of nodes on the cluster
|
||||
* or nodes where this server is connected to. </p>
|
||||
*
|
||||
* @author clebertsuconic
|
||||
*/
|
||||
public class ActiveMQRecoveryRegistry implements XAResourceRecovery
|
||||
{
|
||||
|
||||
private static final ActiveMQRecoveryRegistry theInstance = new ActiveMQRecoveryRegistry();
|
||||
|
||||
private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>();
|
||||
|
||||
/**
|
||||
* The list by server id and resource adapter wrapper, what will actually be calling recovery.
|
||||
* This will be returned by getXAResources
|
||||
*/
|
||||
private final ConcurrentHashMap<String, ActiveMQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, ActiveMQXAResourceWrapper>();
|
||||
|
||||
/**
|
||||
* In case of failures, we retry on the next getXAResources
|
||||
*/
|
||||
private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>();
|
||||
|
||||
private ActiveMQRecoveryRegistry()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called periodically by the Transaction Manager
|
||||
*/
|
||||
public XAResource[] getXAResources()
|
||||
{
|
||||
try
|
||||
{
|
||||
checkFailures();
|
||||
|
||||
ActiveMQXAResourceWrapper[] resourceArray = new ActiveMQXAResourceWrapper[recoveries.size()];
|
||||
resourceArray = recoveries.values().toArray(resourceArray);
|
||||
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("\n=======================================================================================");
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:");
|
||||
for (Map.Entry<String, ActiveMQXAResourceWrapper> entry : recoveries.entrySet())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue());
|
||||
}
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("=======================================================================================\n");
|
||||
}
|
||||
|
||||
return resourceArray;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
return new XAResource[]{};
|
||||
}
|
||||
}
|
||||
|
||||
public static ActiveMQRecoveryRegistry getInstance()
|
||||
{
|
||||
return theInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called by then resource adapters, to register a new discovery
|
||||
*
|
||||
* @param resourceConfig
|
||||
*/
|
||||
public void register(final XARecoveryConfig resourceConfig)
|
||||
{
|
||||
RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig);
|
||||
RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance);
|
||||
if (discoveryRecord == null)
|
||||
{
|
||||
discoveryRecord = newInstance;
|
||||
discoveryRecord.start(false);
|
||||
}
|
||||
// you could have a configuration shared with multiple MDBs or RAs
|
||||
discoveryRecord.incrementUsage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reference counts and deactivate a configuration
|
||||
* Notice: this won't remove the servers since a server may have previous XIDs
|
||||
*
|
||||
* @param resourceConfig
|
||||
*/
|
||||
public void unRegister(final XARecoveryConfig resourceConfig)
|
||||
{
|
||||
RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig);
|
||||
if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0)
|
||||
{
|
||||
discoveryRecord = configSet.remove(resourceConfig);
|
||||
if (discoveryRecord != null)
|
||||
{
|
||||
discoveryRecord.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but
|
||||
* maybe we should.
|
||||
*/
|
||||
public void stop()
|
||||
{
|
||||
for (RecoveryDiscovery recoveryDiscovery : configSet.values())
|
||||
{
|
||||
recoveryDiscovery.stop();
|
||||
}
|
||||
for (ActiveMQXAResourceWrapper activeMQXAResourceWrapper : recoveries.values())
|
||||
{
|
||||
activeMQXAResourceWrapper.close();
|
||||
}
|
||||
recoveries.clear();
|
||||
configSet.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* in case of a failure the Discovery will register itslef to retry
|
||||
*
|
||||
* @param failedDiscovery
|
||||
*/
|
||||
public void failedDiscovery(RecoveryDiscovery failedDiscovery)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery);
|
||||
synchronized (failedDiscoverySet)
|
||||
{
|
||||
failedDiscoverySet.add(failedDiscovery);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param nodeID
|
||||
* @param networkConfiguration
|
||||
* @param username
|
||||
* @param password
|
||||
*/
|
||||
public void nodeUp(String nodeID,
|
||||
Pair<TransportConfiguration, TransportConfiguration> networkConfiguration,
|
||||
String username,
|
||||
String password)
|
||||
{
|
||||
|
||||
if (recoveries.get(nodeID) == null)
|
||||
{
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration);
|
||||
}
|
||||
XARecoveryConfig config = new XARecoveryConfig(true,
|
||||
extractTransportConfiguration(networkConfiguration),
|
||||
username,
|
||||
password);
|
||||
|
||||
ActiveMQXAResourceWrapper wrapper = new ActiveMQXAResourceWrapper(config);
|
||||
recoveries.putIfAbsent(nodeID, wrapper);
|
||||
}
|
||||
}
|
||||
|
||||
public void nodeDown(String nodeID)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* this will go through the list of retries
|
||||
*/
|
||||
private void checkFailures()
|
||||
{
|
||||
final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>();
|
||||
|
||||
// it will transfer all the discoveries to a new collection
|
||||
synchronized (failedDiscoverySet)
|
||||
{
|
||||
failures.addAll(failedDiscoverySet);
|
||||
failedDiscoverySet.clear();
|
||||
}
|
||||
|
||||
if (failures.size() > 0)
|
||||
{
|
||||
// This shouldn't happen on a regular scenario, however when this retry happens this needs
|
||||
// to be done on a new thread
|
||||
Thread t = new Thread("ActiveMQ Recovery Discovery Reinitialization")
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
for (RecoveryDiscovery discovery : failures)
|
||||
{
|
||||
try
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery);
|
||||
discovery.start(true);
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param networkConfiguration
|
||||
* @return
|
||||
*/
|
||||
private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration)
|
||||
{
|
||||
if (networkConfiguration.getB() != null)
|
||||
{
|
||||
return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()};
|
||||
}
|
||||
return new TransportConfiguration[]{networkConfiguration.getA()};
|
||||
}
|
||||
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.jboss.tm.XAResourceRecoveryRegistry;
|
||||
|
||||
/**
|
||||
* This class is a base class for the integration layer where
|
||||
* This class is used on integration points and this is just a bridge to the real registry at
|
||||
* {@link ActiveMQRecoveryRegistry}
|
||||
*
|
||||
* @author Clebert
|
||||
*
|
||||
*
|
||||
*/
|
||||
public abstract class ActiveMQRegistryBase
|
||||
{
|
||||
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
public ActiveMQRegistryBase()
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
public abstract XAResourceRecoveryRegistry getTMRegistry();
|
||||
|
||||
public void register(final XARecoveryConfig resourceConfig)
|
||||
{
|
||||
init();
|
||||
ActiveMQRecoveryRegistry.getInstance().register(resourceConfig);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void unRegister(final XARecoveryConfig resourceConfig)
|
||||
{
|
||||
init();
|
||||
ActiveMQRecoveryRegistry.getInstance().unRegister(resourceConfig);
|
||||
}
|
||||
|
||||
public void stop()
|
||||
{
|
||||
if (started.compareAndSet(true, false) && getTMRegistry() != null)
|
||||
{
|
||||
getTMRegistry().removeXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
|
||||
ActiveMQRecoveryRegistry.getInstance().stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void init()
|
||||
{
|
||||
if (started.compareAndSet(false, true) && getTMRegistry() != null)
|
||||
{
|
||||
getTMRegistry().addXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,236 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.api.core.ActiveMQException;
|
||||
import org.apache.activemq.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.api.core.Pair;
|
||||
import org.apache.activemq.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.api.core.client.ClusterTopologyListener;
|
||||
import org.apache.activemq.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.api.core.client.TopologyMember;
|
||||
import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
|
||||
|
||||
/**
|
||||
* <p>This class will have a simple Connection Factory and will listen
|
||||
* for topology updates. </p>
|
||||
* <p>This Discovery is instantiated by {@link ActiveMQRecoveryRegistry}
|
||||
*
|
||||
* @author clebertsuconic
|
||||
*/
|
||||
public class RecoveryDiscovery implements SessionFailureListener
|
||||
{
|
||||
|
||||
private ServerLocator locator;
|
||||
private ClientSessionFactoryInternal sessionFactory;
|
||||
private final XARecoveryConfig config;
|
||||
private final AtomicInteger usage = new AtomicInteger(0);
|
||||
private boolean started = false;
|
||||
|
||||
|
||||
public RecoveryDiscovery(XARecoveryConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public synchronized void start(boolean retry)
|
||||
{
|
||||
if (!started)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config);
|
||||
started = true;
|
||||
|
||||
locator = config.createServerLocator();
|
||||
locator.disableFinalizeCheck();
|
||||
locator.addClusterTopologyListener(new InternalListener(config));
|
||||
try
|
||||
{
|
||||
sessionFactory = (ClientSessionFactoryInternal) locator.createSessionFactory();
|
||||
// We are using the SessionFactoryInternal here directly as we don't have information to connect with an user and password
|
||||
// on the session as all we want here is to get the topology
|
||||
// in case of failure we will retry
|
||||
sessionFactory.addFailureListener(this);
|
||||
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config);
|
||||
}
|
||||
catch (Exception startupError)
|
||||
{
|
||||
if (!retry)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.xaRecoveryStartError(config);
|
||||
}
|
||||
stop();
|
||||
ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void stop()
|
||||
{
|
||||
internalStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* we may have several connection factories referencing the same connection recovery entry.
|
||||
* Because of that we need to make a count of the number of the instances that are referencing it,
|
||||
* so we will remove it as soon as we are done
|
||||
*/
|
||||
public int incrementUsage()
|
||||
{
|
||||
return usage.decrementAndGet();
|
||||
}
|
||||
|
||||
public int decrementUsage()
|
||||
{
|
||||
return usage.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void finalize()
|
||||
{
|
||||
// I don't think it's a good thing to synchronize a method on a finalize,
|
||||
// hence the internalStop (no sync) call here
|
||||
internalStop();
|
||||
}
|
||||
|
||||
protected void internalStop()
|
||||
{
|
||||
if (started)
|
||||
{
|
||||
started = false;
|
||||
try
|
||||
{
|
||||
if (sessionFactory != null)
|
||||
{
|
||||
sessionFactory.close();
|
||||
}
|
||||
}
|
||||
catch (Exception ignored)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
locator.close();
|
||||
}
|
||||
catch (Exception ignored)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
|
||||
}
|
||||
|
||||
sessionFactory = null;
|
||||
locator = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static final class InternalListener implements ClusterTopologyListener
|
||||
{
|
||||
private final XARecoveryConfig config;
|
||||
|
||||
public InternalListener(final XARecoveryConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeUP(TopologyMember topologyMember, boolean last)
|
||||
{
|
||||
// There is a case where the backup announce itself,
|
||||
// we need to ignore a case where getLive is null
|
||||
if (topologyMember.getLive() != null)
|
||||
{
|
||||
Pair<TransportConfiguration, TransportConfiguration> connector =
|
||||
new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(),
|
||||
topologyMember.getBackup());
|
||||
ActiveMQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector,
|
||||
config.getUsername(), config.getPassword());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDown(long eventUID, String nodeID)
|
||||
{
|
||||
// I'm not putting any node down, since it may have previous transactions hanging, however at some point we may
|
||||
//change it have some sort of timeout for removal
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver)
|
||||
{
|
||||
if (exception.getType() == ActiveMQExceptionType.DISCONNECTED)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception);
|
||||
}
|
||||
else
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery",
|
||||
exception);
|
||||
}
|
||||
internalStop();
|
||||
ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
|
||||
{
|
||||
connectionFailed(me, failedOver);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeReconnect(ActiveMQException exception)
|
||||
{
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see java.lang.Object#toString()
|
||||
*/
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "RecoveryDiscovery [config=" + config + ", started=" + started + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return config.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (o == null || (!(o instanceof RecoveryDiscovery)))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
RecoveryDiscovery discovery = (RecoveryDiscovery) o;
|
||||
|
||||
return config.equals(discovery.config);
|
||||
}
|
||||
|
||||
}
|
|
@ -30,7 +30,7 @@ import java.util.Iterator;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
|
||||
/**
|
||||
* ActiveMQ ManagedConnectionFactory
|
||||
|
|
|
@ -44,12 +44,12 @@ import org.apache.activemq.api.jms.ActiveMQJMSClient;
|
|||
import org.apache.activemq.core.client.impl.ClientSessionInternal;
|
||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.ra.ActiveMQRABundle;
|
||||
import org.apache.activemq.ra.ActiveMQRAConnectionFactory;
|
||||
import org.apache.activemq.ra.ActiveMQRALogger;
|
||||
import org.apache.activemq.ra.ActiveMQRaUtils;
|
||||
import org.apache.activemq.ra.ActiveMQResourceAdapter;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.utils.FutureLatch;
|
||||
import org.apache.activemq.utils.SensitiveDataCodec;
|
||||
|
||||
|
|
|
@ -18,12 +18,14 @@ package org.apache.activemq.ra.recovery;
|
|||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.server.recovery.ActiveMQRegistryBase;
|
||||
import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.ra.ActiveMQRALogger;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.utils.ConcurrentHashSet;
|
||||
|
||||
|
@ -33,7 +35,7 @@ import org.apache.activemq.utils.ConcurrentHashSet;
|
|||
*/
|
||||
public final class RecoveryManager
|
||||
{
|
||||
private ActiveMQRegistryBase registry;
|
||||
private ActiveMQRegistry registry;
|
||||
|
||||
private static final String RESOURCE_RECOVERY_CLASS_NAMES = "org.jboss.as.messaging.jms.AS7RecoveryRegistry;"
|
||||
+ "org.jboss.as.integration.activemq.recovery.AS5RecoveryRegistry";
|
||||
|
@ -97,7 +99,15 @@ public final class RecoveryManager
|
|||
{
|
||||
try
|
||||
{
|
||||
registry = (ActiveMQRegistryBase) safeInitNewInstance(locatorClasse);
|
||||
ServiceLoader<ActiveMQRegistry> sl = ServiceLoader.load(ActiveMQRegistry.class);
|
||||
if (sl.iterator().hasNext())
|
||||
{
|
||||
registry = sl.iterator().next();
|
||||
}
|
||||
else
|
||||
{
|
||||
registry = ActiveMQRegistryImpl.getInstance();
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
|
|
|
@ -18,6 +18,11 @@
|
|||
<artifactId>activemq-core-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-jms-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging</artifactId>
|
||||
|
|
|
@ -14,5 +14,20 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
|
||||
package org.apache.activemq.service.extensions.xa.recovery;
|
||||
|
||||
/**
|
||||
* @author mtaylor
|
||||
*/
|
||||
|
||||
public interface ActiveMQRegistry
|
||||
{
|
||||
void register(final XARecoveryConfig resourceConfig);
|
||||
|
||||
void unRegister(final XARecoveryConfig resourceConfig);
|
||||
|
||||
void stop();
|
||||
|
||||
void init();
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.service.extensions.xa.recovery;
|
||||
|
||||
/**
|
||||
* @author mtaylor
|
||||
*/
|
||||
|
||||
public class ActiveMQRegistryImpl implements ActiveMQRegistry
|
||||
{
|
||||
private static ActiveMQRegistryImpl instance;
|
||||
|
||||
public static ActiveMQRegistry getInstance()
|
||||
{
|
||||
if (instance == null)
|
||||
{
|
||||
instance = new ActiveMQRegistryImpl();
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(XARecoveryConfig resourceConfig)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unRegister(XARecoveryConfig resourceConfig)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init()
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.service.extensions.xa.recovery;
|
||||
|
||||
import org.apache.activemq.api.core.client.ClientSessionFactory;
|
||||
import org.jboss.logging.BasicLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.jboss.logging.annotations.Cause;
|
||||
import org.jboss.logging.annotations.LogMessage;
|
||||
import org.jboss.logging.annotations.Message;
|
||||
import org.jboss.logging.annotations.MessageLogger;
|
||||
import org.w3c.dom.Node;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
|
||||
* @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
|
||||
*
|
||||
* Logger Code 12
|
||||
*
|
||||
* each message id must be 6 digits long starting with 12, the 3rd digit donates the level so
|
||||
*
|
||||
* INF0 1
|
||||
* WARN 2
|
||||
* DEBUG 3
|
||||
* ERROR 4
|
||||
* TRACE 5
|
||||
* FATAL 6
|
||||
*
|
||||
* so an INFO message would be 121000 to 121999
|
||||
*/
|
||||
@MessageLogger(projectCode = "AMQ")
|
||||
public interface ActiveMQXARecoveryLogger extends BasicLogger
|
||||
{
|
||||
/**
|
||||
* The default logger.
|
||||
*/
|
||||
ActiveMQXARecoveryLogger LOGGER = Logger.getMessageLogger(ActiveMQXARecoveryLogger.class, ActiveMQXARecoveryLogger.class.getPackage().getName());
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 121003, value = "JMS Server Manager Running cached command for {0}" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void serverRunningCachedCommand(Runnable run);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 121004, value = "JMS Server Manager Caching command for {0} since the JMS Server is not active yet",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void serverCachingCommand(Object runnable);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void invalidHostForConnector(String name, String newHost);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122007, value = "Queue {0} does not exist on the topic {1}. It was deleted manually probably." , format = Message.Format.MESSAGE_FORMAT)
|
||||
void noQueueOnTopic(String queueName, String name);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122008, value = "XA Recovery can not connect to any ActiveMQ server on recovery {0}" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void recoveryConnectFailed(String s);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122011, value = "error unbinding {0} from JNDI" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void jndiUnbindError(@Cause Exception e, String key);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122012, value = "JMS Server Manager error" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void jmsServerError(@Cause Exception e);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122013, value = "Error in XA Recovery recover" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoverError(@Cause Exception e);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122014, value = "Notified of connection failure in xa recovery connectionFactory for provider {0} will attempt reconnect on next pass",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoverConnectionError(@Cause Exception e, ClientSessionFactory csf);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122015, value = "Can not connect to {0} on auto-generated resource recovery",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoverAutoConnectionError(@Cause Throwable e, XARecoveryConfig csf);
|
||||
|
||||
@LogMessage(level = Logger.Level.DEBUG)
|
||||
@Message(id = 122016, value = "Error in XA Recovery" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoveryError(@Cause Exception e);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122017, value = "Tried to correct invalid \"host\" value \"0.0.0.0\" for \"{0}\" connector, but received an exception.",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void failedToCorrectHost(@Cause Exception e, String name);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 122018, value = "Could not start recovery discovery on {0}, we will retry every recovery scan until the server is available",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void xaRecoveryStartError(XARecoveryConfig e);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 124000, value = "key attribute missing for JMS configuration {0}" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void jmsConfigMissingKey(Node e);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 124002, value = "Failed to start JMS deployer" , format = Message.Format.MESSAGE_FORMAT)
|
||||
void jmsDeployerStartError(@Cause Exception e);
|
||||
}
|
|
@ -14,15 +14,13 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
package org.apache.activemq.service.extensions.xa.recovery;
|
||||
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.arjuna.ats.jta.recovery.XAResourceRecovery;
|
||||
import org.apache.activemq.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
|
||||
|
||||
/**
|
||||
* A XAResourceRecovery instance that can be used to recover any JMS provider.
|
||||
|
@ -52,9 +50,9 @@ import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
|
|||
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
|
||||
* @version <tt>$Revision: 1.1 $</tt>
|
||||
*/
|
||||
public class ActiveMQXAResourceRecovery implements XAResourceRecovery
|
||||
public class ActiveMQXAResourceRecovery
|
||||
{
|
||||
private final boolean trace = ActiveMQJMSServerLogger.LOGGER.isTraceEnabled();
|
||||
private final boolean trace = ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled();
|
||||
|
||||
private boolean hasMore;
|
||||
|
||||
|
@ -64,15 +62,15 @@ public class ActiveMQXAResourceRecovery implements XAResourceRecovery
|
|||
{
|
||||
if (trace)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
|
||||
ActiveMQXARecoveryLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean initialise(final String config)
|
||||
{
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.trace(this + " intialise: " + config);
|
||||
ActiveMQXARecoveryLogger.LOGGER.trace(this + " intialise: " + config);
|
||||
}
|
||||
|
||||
String[] configs = config.split(";");
|
||||
|
@ -92,9 +90,9 @@ public class ActiveMQXAResourceRecovery implements XAResourceRecovery
|
|||
|
||||
res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);
|
||||
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.trace(this + " initialised");
|
||||
ActiveMQXARecoveryLogger.LOGGER.trace(this + " initialised");
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -102,9 +100,9 @@ public class ActiveMQXAResourceRecovery implements XAResourceRecovery
|
|||
|
||||
public boolean hasMoreResources()
|
||||
{
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.trace(this + " hasMoreResources");
|
||||
ActiveMQXARecoveryLogger.LOGGER.trace(this + " hasMoreResources");
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -128,9 +126,9 @@ public class ActiveMQXAResourceRecovery implements XAResourceRecovery
|
|||
|
||||
public XAResource getXAResource()
|
||||
{
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.trace(this + " getXAResource");
|
||||
ActiveMQXARecoveryLogger.LOGGER.trace(this + " getXAResource");
|
||||
}
|
||||
|
||||
return res;
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
package org.apache.activemq.service.extensions.xa.recovery;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -30,7 +30,6 @@ import org.apache.activemq.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
|
||||
|
||||
/**
|
||||
* XAResourceWrapper.
|
||||
|
@ -65,9 +64,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
{
|
||||
this.xaRecoveryConfigs = xaRecoveryConfigs;
|
||||
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
|
||||
", instance=" +
|
||||
System.identityHashCode(this));
|
||||
}
|
||||
|
@ -77,25 +76,25 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
{
|
||||
XAResource xaResource = getDelegate(false);
|
||||
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Xid[] xids = xaResource.recover(flag);
|
||||
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
|
||||
}
|
||||
|
||||
return xids;
|
||||
}
|
||||
catch (XAException e)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.xaRecoverError(e);
|
||||
ActiveMQXARecoveryLogger.LOGGER.xaRecoverError(e);
|
||||
throw check(e);
|
||||
}
|
||||
}
|
||||
|
@ -103,9 +102,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public void commit(final Xid xid, final boolean onePhase) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(true);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -120,9 +119,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public void rollback(final Xid xid) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(true);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -137,9 +136,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public void forget(final Xid xid) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(false);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Forget " + xaResource + " xid ");
|
||||
}
|
||||
|
||||
try
|
||||
|
@ -173,9 +172,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public int prepare(final Xid xid) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(true);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("prepare " + xaResource + " xid ");
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -190,9 +189,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public void start(final Xid xid, final int flags) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(false);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("start " + xaResource + " xid ");
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -207,9 +206,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public void end(final Xid xid, final int flags) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(false);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("end " + xaResource + " xid ");
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -224,9 +223,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public int getTransactionTimeout() throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(false);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -241,9 +240,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
public boolean setTransactionTimeout(final int seconds) throws XAException
|
||||
{
|
||||
XAResource xaResource = getDelegate(false);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -259,14 +258,14 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
{
|
||||
if (me.getType() == ActiveMQExceptionType.DISCONNECTED)
|
||||
{
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("being disconnected for server shutdown", me);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf);
|
||||
ActiveMQXARecoveryLogger.LOGGER.xaRecoverConnectionError(me, csf);
|
||||
}
|
||||
close();
|
||||
}
|
||||
|
@ -312,9 +311,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
{
|
||||
xae.initCause(error);
|
||||
}
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
|
||||
}
|
||||
throw xae;
|
||||
}
|
||||
|
@ -326,9 +325,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
{
|
||||
xae.initCause(error);
|
||||
}
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
|
||||
}
|
||||
throw xae;
|
||||
}
|
||||
|
@ -362,9 +361,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
{
|
||||
continue;
|
||||
}
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
|
||||
}
|
||||
|
||||
ClientSession cs = null;
|
||||
|
@ -402,10 +401,10 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
|
||||
ActiveMQXARecoveryLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(e.getMessage(), e);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug(e.getMessage(), e);
|
||||
}
|
||||
|
||||
try
|
||||
|
@ -415,9 +414,9 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
}
|
||||
catch (Throwable ignored)
|
||||
{
|
||||
if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
|
||||
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled())
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored);
|
||||
ActiveMQXARecoveryLogger.LOGGER.trace(e.getMessage(), ignored);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
|
@ -432,7 +431,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
|
||||
return delegate;
|
||||
}
|
||||
ActiveMQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
|
||||
ActiveMQXARecoveryLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
|
||||
throw new ActiveMQNotConnectedException();
|
||||
}
|
||||
|
||||
|
@ -480,7 +479,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
}
|
||||
catch (Throwable ignorable)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -492,7 +491,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
}
|
||||
catch (Throwable ignorable)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -504,7 +503,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
}
|
||||
catch (Throwable ignorable)
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
|
||||
ActiveMQXARecoveryLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -519,7 +518,7 @@ public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureList
|
|||
*/
|
||||
protected XAException check(final XAException e) throws XAException
|
||||
{
|
||||
ActiveMQJMSServerLogger.LOGGER.xaRecoveryError(e);
|
||||
ActiveMQXARecoveryLogger.LOGGER.xaRecoveryError(e);
|
||||
|
||||
|
||||
// If any exception happened, we close the connection so we may start fresh
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.jms.server.recovery;
|
||||
package org.apache.activemq.service.extensions.xa.recovery;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
|
@ -128,8 +128,8 @@
|
|||
</dependency>
|
||||
<!--this specifically for the JMS Bridge -->
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.components</groupId>
|
||||
|
|
|
@ -22,13 +22,11 @@ import javax.resource.spi.endpoint.MessageEndpoint;
|
|||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
|
||||
import org.apache.activemq.api.core.client.ClientSession;
|
||||
import org.apache.activemq.api.core.client.ClientSessionFactory;
|
||||
|
@ -38,11 +36,10 @@ import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
|
|||
import org.apache.activemq.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.jms.server.recovery.RecoveryDiscovery;
|
||||
import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.ra.ActiveMQResourceAdapter;
|
||||
import org.apache.activemq.ra.inflow.ActiveMQActivation;
|
||||
import org.apache.activemq.ra.inflow.ActiveMQActivationSpec;
|
||||
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.tests.unit.ra.MessageEndpointFactory;
|
||||
import org.apache.activemq.tests.util.UnitTestCase;
|
||||
import org.apache.activemq.utils.DefaultSensitiveStringCodec;
|
||||
|
@ -707,22 +704,6 @@ public class ResourceAdapterTest extends ActiveMQRATestBase
|
|||
assertTrue(endpoint.released);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryDiscoveryAsKey() throws Exception
|
||||
{
|
||||
Set<RecoveryDiscovery> discoverySet = new HashSet<RecoveryDiscovery>();
|
||||
String factClass = "org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory";
|
||||
TransportConfiguration transportConfig = new TransportConfiguration(factClass, null, "netty");
|
||||
XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
|
||||
null, null);
|
||||
|
||||
RecoveryDiscovery discovery1 = new RecoveryDiscovery(config);
|
||||
RecoveryDiscovery discovery2 = new RecoveryDiscovery(config);
|
||||
assertTrue(discoverySet.add(discovery1));
|
||||
assertFalse(discoverySet.add(discovery2));
|
||||
assertEquals("should have only one in the set", 1, discoverySet.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSecurity()
|
||||
{
|
||||
|
|
|
@ -76,8 +76,8 @@
|
|||
</dependency>
|
||||
<!--this specifically for the JMS Bridge -->
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.components</groupId>
|
||||
|
|
|
@ -90,8 +90,8 @@
|
|||
</dependency>
|
||||
<!--this specifically for the JMS Bridge-->
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.components</groupId>
|
||||
|
|
|
@ -109,8 +109,8 @@
|
|||
</dependency>
|
||||
<!--this specifically for the JMS Bridge-->
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.components</groupId>
|
||||
|
|
|
@ -109,8 +109,8 @@
|
|||
</dependency>
|
||||
<!--this specifically for the JMS Bridge-->
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.components</groupId>
|
||||
|
|
|
@ -87,8 +87,8 @@
|
|||
</dependency>
|
||||
<!--this specifically for the JMS Bridge-->
|
||||
<dependency>
|
||||
<groupId>org.jboss</groupId>
|
||||
<artifactId>jboss-transaction-spi</artifactId>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.components</groupId>
|
||||
|
|
Loading…
Reference in New Issue