Modifying ActiveMQResourceAdapter to implement Serializable to support
WebLogic.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-24 13:02:02 +00:00
parent 11579bb918
commit d983d525ce
1 changed files with 27 additions and 17 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.ra;
import java.io.Serializable;
import java.net.URI;
import java.util.HashMap;
@ -28,8 +29,8 @@ import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
@ -44,23 +45,24 @@ import org.slf4j.LoggerFactory;
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
* and deliver messages to those end points using the connection configure in
* the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
*
*
* @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
* description="The JCA Resource Adaptor for ActiveMQ"
*
*
*/
public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements Serializable, MessageResourceAdapter {
private static final long serialVersionUID = 360805587169336959L;
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
private transient final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
private BootstrapContext bootstrapContext;
private transient BootstrapContext bootstrapContext;
private String brokerXmlConfig;
private BrokerService broker;
private Thread brokerStartThread;
private transient BrokerService broker;
private transient Thread brokerStartThread;
private ActiveMQConnectionFactory connectionFactory;
/**
*
*
*/
public ActiveMQResourceAdapter() {
super();
@ -69,6 +71,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
/**
* @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
*/
@Override
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
this.bootstrapContext = bootstrapContext;
if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
@ -80,7 +83,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
synchronized( ActiveMQResourceAdapter.this ) {
broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
}
@ -97,13 +100,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
};
brokerStartThread.setDaemon(true);
brokerStartThread.start();
// Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
try {
brokerStartThread.join(1000*5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
@ -118,6 +121,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
/**
* @param activationSpec
*/
@Override
public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
ActiveMQConnectionFactory cf = getConnectionFactory();
if (cf == null) {
@ -146,6 +150,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
/**
* @see javax.resource.spi.ResourceAdapter#stop()
*/
@Override
public void stop() {
synchronized (endpointWorkers) {
while (endpointWorkers.size() > 0) {
@ -153,7 +158,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
}
}
synchronized( this ) {
if (broker != null) {
if( brokerStartThread.isAlive() ) {
@ -163,13 +168,14 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
broker = null;
}
}
this.bootstrapContext = null;
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
*/
@Override
public BootstrapContext getBootstrapContext() {
return bootstrapContext;
}
@ -178,6 +184,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
* @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
* javax.resource.spi.ActivationSpec)
*/
@Override
public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
// spec section 5.3.3
@ -206,6 +213,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
* @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
* javax.resource.spi.ActivationSpec)
*/
@Override
public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
if (activationSpec instanceof MessageActivationSpec) {
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
@ -236,9 +244,10 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
/**
* We only connect to one resource manager per ResourceAdapter instance, so
* any ActivationSpec will return the same XAResource.
*
*
* @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
*/
@Override
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
try {
return new XAResource[]{
@ -376,6 +385,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
*/
@Override
public String getBrokerXmlConfig() {
return brokerXmlConfig;
}
@ -384,7 +394,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
* Sets the <a href="http://activemq.org/Xml+Configuration">XML
* configuration file </a> used to configure the ActiveMQ broker via Spring
* if using embedded mode.
*
*
* @param brokerXmlConfig is the filename which is assumed to be on the
* classpath unless a URL is specified. So a value of
* <code>foo/bar.xml</code> would be assumed to be on the