applied modified version of AMQ-1147 (keeping the implementation classes the same name to avoid issues with existing RA configurations)

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515576 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-03-07 14:14:23 +00:00
parent d7355e7874
commit 3bd948f738
2 changed files with 1315 additions and 0 deletions

View File

@ -0,0 +1,722 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import java.beans.IntrospectionException;
import java.beans.PropertyDescriptor;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.resource.ResourceException;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.selector.SelectorParser;
/**
* Configures the inbound JMS consumer specification using ActiveMQ
*
* @org.apache.xbean.XBean element="activationSpec"
*
* @version $Revision$ $Date$
*/
public class ActiveMQActivationSpec implements MessageActivationSpec, Serializable {
private static final long serialVersionUID = -7153087544100459975L;
/** Auto-acknowledge constant for <code>acknowledgeMode</code> property **/
public static final String AUTO_ACKNOWLEDGE_MODE = "Auto-acknowledge";
/** Dups-ok-acknowledge constant for <code>acknowledgeMode</code> property * */
public static final String DUPS_OK_ACKNOWLEDGE_MODE = "Dups-ok-acknowledge";
/** Durable constant for <code>subscriptionDurability</code> property * */
public static final String DURABLE_SUBSCRIPTION = "Durable";
/** NonDurable constant for <code>subscriptionDurability</code> property * */
public static final String NON_DURABLE_SUBSCRIPTION = "NonDurable";
/**
*
*/
public static final int INVALID_ACKNOWLEDGE_MODE = -1;
private transient MessageResourceAdapter resourceAdapter;
private String destinationType;
private String messageSelector;
private String destination;
private String acknowledgeMode = AUTO_ACKNOWLEDGE_MODE;
private String userName;
private String password;
private String clientId;
private String subscriptionName;
private String subscriptionDurability = NON_DURABLE_SUBSCRIPTION;
private String noLocal = "false";
private String useRAManagedTransaction = "false";
private String maxSessions="10";
private String maxMessagesPerSessions="10";
private String enableBatch = "false";
private String maxMessagesPerBatch = "10";
private RedeliveryPolicy redeliveryPolicy;
/**
* @see javax.resource.spi.ActivationSpec#validate()
*/
public void validate() throws InvalidPropertyException {
List errorMessages = new ArrayList();
List propsNotSet = new ArrayList();
try {
if (!isValidDestination(errorMessages))
propsNotSet.add(new PropertyDescriptor("destination", ActiveMQActivationSpec.class));
if (!isValidDestinationType(errorMessages))
propsNotSet.add(new PropertyDescriptor("destinationType", ActiveMQActivationSpec.class));
if (!isValidAcknowledgeMode(errorMessages))
propsNotSet.add(new PropertyDescriptor("acknowledgeMode", ActiveMQActivationSpec.class));
if (!isValidSubscriptionDurability(errorMessages))
propsNotSet.add(new PropertyDescriptor("subscriptionDurability", ActiveMQActivationSpec.class));
if (!isValidClientId(errorMessages))
propsNotSet.add(new PropertyDescriptor("clientId", ActiveMQActivationSpec.class));
if (!isValidSubscriptionName(errorMessages))
propsNotSet.add(new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class));
if (!isValidMaxMessagesPerSessions(errorMessages))
propsNotSet.add(new PropertyDescriptor("maxMessagesPerSessions", ActiveMQActivationSpec.class));
if (!isValidMaxSessions(errorMessages))
propsNotSet.add(new PropertyDescriptor("maxSessions", ActiveMQActivationSpec.class));
if (!isValidMessageSelector(errorMessages))
propsNotSet.add(new PropertyDescriptor("messageSelector", ActiveMQActivationSpec.class));
if (!isValidNoLocal(errorMessages))
propsNotSet.add(new PropertyDescriptor("noLocal", ActiveMQActivationSpec.class));
if (!isValidUseRAManagedTransaction(errorMessages))
propsNotSet.add(new PropertyDescriptor("useRAManagedTransaction", ActiveMQActivationSpec.class));
if (!isValidEnableBatch(errorMessages))
propsNotSet.add(new PropertyDescriptor("enableBatch", ActiveMQActivationSpec.class));
if (!isValidMaxMessagesPerBatch(errorMessages))
propsNotSet.add(new PropertyDescriptor("maxMessagesPerBatch", ActiveMQActivationSpec.class));
} catch (IntrospectionException e) {
e.printStackTrace();
}
if (propsNotSet.size() > 0) {
StringBuffer b = new StringBuffer();
b.append("Invalid settings:");
for (Iterator iter = errorMessages.iterator(); iter.hasNext();) {
b.append(" ");
b.append(iter.next());
}
InvalidPropertyException e = new InvalidPropertyException(b.toString());
final PropertyDescriptor[] descriptors = (PropertyDescriptor[]) propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]);
e.setInvalidPropertyDescriptors(descriptors);
throw e;
}
}
private boolean isValidUseRAManagedTransaction(List errorMessages) {
try {
new Boolean(noLocal);
return true;
} catch (Throwable e) {
//
}
errorMessages.add("noLocal must be set to: true or false.");
return false;
}
private boolean isValidNoLocal(List errorMessages) {
try {
new Boolean(noLocal);
return true;
} catch (Throwable e) {
//
}
errorMessages.add("noLocal must be set to: true or false.");
return false;
}
private boolean isValidMessageSelector(List errorMessages) {
try {
if( !isEmpty(messageSelector) ) {
new SelectorParser().parse(messageSelector);
}
return true;
} catch (Throwable e) {
errorMessages.add("messageSelector not set to valid message selector: "+e.getMessage());
return false;
}
}
private boolean isValidMaxSessions(List errorMessages) {
try {
if( Integer.parseInt(maxSessions) > 0 ) {
return true;
}
} catch (NumberFormatException e) {
//
}
errorMessages.add("maxSessions must be set to number > 0");
return false;
}
private boolean isValidMaxMessagesPerSessions(List errorMessages) {
try {
if( Integer.parseInt(maxMessagesPerSessions) > 0 ) {
return true;
}
} catch (NumberFormatException e) {
//
}
errorMessages.add("maxMessagesPerSessions must be set to number > 0");
return false;
}
private boolean isValidMaxMessagesPerBatch(List errorMessages) {
try {
if( Integer.parseInt(maxMessagesPerBatch) > 0 ) {
return true;
}
} catch (NumberFormatException e) {
//
}
errorMessages.add("maxMessagesPerBatch must be set to number > 0");
return false;
}
private boolean isValidEnableBatch(List errorMessages) {
try {
new Boolean(enableBatch);
return true;
} catch (Throwable e) {
//
}
errorMessages.add("enableBatch must be set to: true or false");
return false;
}
/**
* @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter()
*/
public ResourceAdapter getResourceAdapter() {
return resourceAdapter;
}
/**
* @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter)
*/
public void setResourceAdapter(ResourceAdapter resourceAdapter) throws ResourceException {
//spec section 5.3.3
if (this.resourceAdapter != null) {
throw new ResourceException("ResourceAdapter already set");
}
if (!(resourceAdapter instanceof MessageResourceAdapter)) {
throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName());
}
this.resourceAdapter = (MessageResourceAdapter) resourceAdapter;
}
/////////////////////////////////////////////////////////////////////////
//
// Java Bean getters and setters for this ActivationSpec class.
//
/////////////////////////////////////////////////////////////////////////
/**
* @return Returns the destinationType.
*/
public String getDestinationType() {
if (!isEmpty(destinationType)) {
return destinationType;
}
return null;
}
/**
* @param destinationType The destinationType to set.
*/
public void setDestinationType(String destinationType) {
this.destinationType = destinationType;
}
/**
*
*/
public String getPassword() {
if (!isEmpty(password)) {
return password;
}
return null;
}
/**
*
*/
public void setPassword(String password) {
this.password = password;
}
/**
*
*/
public String getUserName() {
if (!isEmpty(userName)) {
return userName;
}
return null;
}
/**
*
*/
public void setUserName(String userName) {
this.userName = userName;
}
/**
* @return Returns the messageSelector.
*/
public String getMessageSelector() {
if (!isEmpty(messageSelector)) {
return messageSelector;
}
return null;
}
/**
* @param messageSelector The messageSelector to set.
*/
public void setMessageSelector(String messageSelector) {
this.messageSelector = messageSelector;
}
/**
* @return Returns the noLocal.
*/
public String getNoLocal() {
return noLocal;
}
/**
* @param noLocal The noLocal to set.
*/
public void setNoLocal(String noLocal) {
if( noLocal!=null ) {
this.noLocal = noLocal;
}
}
/**
*
*/
public String getAcknowledgeMode() {
if (!isEmpty(acknowledgeMode)) {
return acknowledgeMode;
}
return null;
}
/**
*
*/
public void setAcknowledgeMode(String acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}
/**
*
*/
public String getClientId() {
if (!isEmpty(clientId)) {
return clientId;
}
return null;
}
/**
*
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
*
*/
public String getDestination() {
if (!isEmpty(destination)) {
return destination;
}
return null;
}
/**
*
*/
public void setDestination(String destination) {
this.destination = destination;
}
/**
*
*/
public String getSubscriptionDurability() {
if (!isEmpty(subscriptionDurability)) {
return subscriptionDurability;
}
return null;
}
/**
*
*/
public void setSubscriptionDurability(String subscriptionDurability) {
this.subscriptionDurability = subscriptionDurability;
}
/**
*
*/
public String getSubscriptionName() {
if (!isEmpty(subscriptionName)) {
return subscriptionName;
}
return null;
}
/**
*
*/
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
private boolean isValidSubscriptionName(List errorMessages) {
if( !isDurableSubscription() ? true : subscriptionName != null && subscriptionName.trim().length() > 0 ) {
return true;
}
errorMessages.add("subscriptionName must be set since durable subscription was requested.");
return false;
}
private boolean isValidClientId(List errorMessages) {
if( !isDurableSubscription() ? true : clientId != null && clientId.trim().length() > 0 ) {
return true;
}
errorMessages.add("clientId must be set since durable subscription was requested.");
return false;
}
/**
*
*/
public boolean isDurableSubscription() {
return DURABLE_SUBSCRIPTION.equals(subscriptionDurability);
}
private boolean isValidSubscriptionDurability(List errorMessages) {
// subscriptionDurability only applies to Topics
if ( DURABLE_SUBSCRIPTION.equals(subscriptionDurability) &&
getDestinationType() != null && !Topic.class.getName().equals(getDestinationType())) {
errorMessages.add("subscriptionDurability cannot be set to: "+DURABLE_SUBSCRIPTION+" when destinationType is set to "+
Queue.class.getName()+" as it is only valid when destinationType is set to "+Topic.class.getName()+".");
return false;
}
if (NON_DURABLE_SUBSCRIPTION.equals(subscriptionDurability) || DURABLE_SUBSCRIPTION.equals(subscriptionDurability))
return true;
errorMessages.add("subscriptionDurability must be set to: "+NON_DURABLE_SUBSCRIPTION+" or "+DURABLE_SUBSCRIPTION+".");
return false;
}
private boolean isValidAcknowledgeMode(List errorMessages) {
if (AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) || DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode))
return true;
errorMessages.add("acknowledgeMode must be set to: "+AUTO_ACKNOWLEDGE_MODE+" or "+DUPS_OK_ACKNOWLEDGE_MODE+".");
return false;
}
private boolean isValidDestinationType(List errorMessages) {
if (Queue.class.getName().equals(destinationType) || Topic.class.getName().equals(destinationType))
return true;
errorMessages.add("destinationType must be set to: "+Queue.class.getName()+" or "+Topic.class.getName()+".");
return false;
}
private boolean isValidDestination(List errorMessages) {
if(!(destination == null || destination.equals("")))
return true;
errorMessages.add("destination is a required field and must be set to the destination name.");
return false;
}
private boolean isEmpty(String value) {
return value == null || "".equals(value.trim());
}
/**
*
*/
@Override
public String toString() {
return "ActiveMQActivationSpec{" +
"acknowledgeMode='" + acknowledgeMode + "'" +
", destinationType='" + destinationType + "'" +
", messageSelector='" + messageSelector + "'" +
", destination='" + destination + "'" +
", clientId='" + clientId + "'" +
", subscriptionName='" + subscriptionName + "'" +
", subscriptionDurability='" + subscriptionDurability + "'" +
"}";
}
public int getAcknowledgeModeForSession() {
if( AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) {
return Session.AUTO_ACKNOWLEDGE;
} else if( DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) {
return Session.DUPS_OK_ACKNOWLEDGE;
} else {
return INVALID_ACKNOWLEDGE_MODE;
}
}
/**
* A helper method mostly for use in Dependency Injection containers
* which allows you to customize the destination and destinationType properties
* from a single ActiveMQDestination POJO
*/
public void setActiveMQDestination(ActiveMQDestination destination) {
setDestination(destination.getPhysicalName());
if (destination instanceof Queue) {
setDestinationType(Queue.class.getName());
}
else {
setDestinationType(Topic.class.getName());
}
}
/**
*
*/
public ActiveMQDestination createDestination() {
if( isEmpty(destinationType) || isEmpty(destination) )
return null;
ActiveMQDestination dest = null;
if (Queue.class.getName().equals(destinationType)) {
dest = new ActiveMQQueue(destination);
} else if (Topic.class.getName().equals(destinationType)) {
dest = new ActiveMQTopic(destination);
} else {
assert false : "Execution should never reach here";
}
return dest;
}
public String getMaxMessagesPerSessions() {
return maxMessagesPerSessions.toString();
}
/**
*
*/
public void setMaxMessagesPerSessions(String maxMessagesPerSessions) {
if( maxMessagesPerSessions!=null ) {
this.maxMessagesPerSessions = maxMessagesPerSessions;
}
}
/**
*
*/
public String getMaxSessions() {
return maxSessions;
}
/**
*
*/
public void setMaxSessions(String maxSessions) {
if( maxSessions!=null ) {
this.maxSessions = maxSessions;
}
}
/**
*
*/
public String getUseRAManagedTransaction() {
return useRAManagedTransaction;
}
/**
*
*/
public void setUseRAManagedTransaction(String useRAManagedTransaction) {
if( useRAManagedTransaction!=null ) {
this.useRAManagedTransaction = useRAManagedTransaction;
}
}
/**
*
*/
public int getMaxMessagesPerSessionsIntValue() {
return Integer.parseInt(maxMessagesPerSessions);
}
/**
*
*/
public int getMaxSessionsIntValue() {
return Integer.parseInt(maxSessions);
}
public boolean isUseRAManagedTransactionEnabled() {
return new Boolean(useRAManagedTransaction).booleanValue();
}
/**
*
*/
public boolean getNoLocalBooleanValue() {
return new Boolean(noLocal).booleanValue();
}
public String getEnableBatch() {
return enableBatch;
}
/**
*
*/
public void setEnableBatch(String enableBatch) {
if (enableBatch != null) {
this.enableBatch = enableBatch;
}
}
public boolean getEnableBatchBooleanValue() {
return new Boolean(enableBatch).booleanValue();
}
public int getMaxMessagesPerBatchIntValue() {
return Integer.parseInt(maxMessagesPerBatch);
}
public String getMaxMessagesPerBatch() {
return maxMessagesPerBatch.toString();
}
/**
*
*/
public void setMaxMessagesPerBatch(String maxMessagesPerBatch) {
if (maxMessagesPerBatch != null) {
this.maxMessagesPerBatch = maxMessagesPerBatch;
}
}
/**
*
*/
public short getBackOffMultiplier() {
if (redeliveryPolicy == null) {
return 0;
}
return redeliveryPolicy.getBackOffMultiplier();
}
/**
*
*/
public long getInitialRedeliveryDelay() {
if (redeliveryPolicy == null) {
return 0;
}
return redeliveryPolicy.getInitialRedeliveryDelay();
}
/**
*
*/
public int getMaximumRedeliveries() {
if (redeliveryPolicy == null) {
return 0;
}
return redeliveryPolicy.getMaximumRedeliveries();
}
/**
*
*/
public boolean isUseExponentialBackOff() {
if (redeliveryPolicy == null) {
return false;
}
return redeliveryPolicy.isUseExponentialBackOff();
}
/**
*
*/
public void setBackOffMultiplier(short backOffMultiplier) {
lazyCreateRedeliveryPolicy().setBackOffMultiplier(backOffMultiplier);
}
/**
*
*/
public void setInitialRedeliveryDelay(long initialRedeliveryDelay) {
lazyCreateRedeliveryPolicy().setInitialRedeliveryDelay(initialRedeliveryDelay);
}
/**
*
*/
public void setMaximumRedeliveries(int maximumRedeliveries) {
lazyCreateRedeliveryPolicy().setMaximumRedeliveries(maximumRedeliveries);
}
/**
*
*/
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
lazyCreateRedeliveryPolicy().setUseExponentialBackOff(useExponentialBackOff);
}
// don't use getter to avoid causing introspection errors in containers
/**
*
*/
public RedeliveryPolicy redeliveryPolicy() {
return redeliveryPolicy;
}
protected RedeliveryPolicy lazyCreateRedeliveryPolicy() {
if (redeliveryPolicy == null) {
redeliveryPolicy = new RedeliveryPolicy();
}
return redeliveryPolicy;
}
}

View File

@ -0,0 +1,593 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
* and deliver messages to those end points using the connection configure in the
* resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
*
* @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
* description="The JCA Resource Adaptor for ActiveMQ"
*
* @version $Revision$
*/
public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable {
private static final long serialVersionUID = -5417363537865649130L;
private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
private final HashMap endpointWorkers = new HashMap();
private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
private BootstrapContext bootstrapContext;
private String brokerXmlConfig;
private BrokerService broker;
private ActiveMQConnectionFactory connectionFactory;
/**
*
*/
public ActiveMQResourceAdapter() {
super();
}
/**
* @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
*/
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
this.bootstrapContext = bootstrapContext;
if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) {
try {
broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
broker.start();
} catch (Throwable e) {
throw new ResourceAdapterInternalException("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e);
}
}
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
*/
public ActiveMQConnection makeConnection() throws JMSException {
if (connectionFactory != null) {
return makeConnection(info, connectionFactory);
}
return makeConnection(info);
}
/**
*/
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
return makeConnection(info, connectionFactory);
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection(org.apache.activemq.ra.ActiveMQConnectionRequestInfo, org.apache.activemq.ActiveMQConnectionFactory)
*/
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException {
String userName = info.getUserName();
String password = info.getPassword();
ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
String clientId = info.getClientid();
if (clientId != null && clientId.length() > 0) {
physicalConnection.setClientID(clientId);
}
return physicalConnection;
}
/**
* @param activationSpec
*/
public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
String password = defaultValue(activationSpec.getPassword(), info.getPassword());
String clientId = activationSpec.getClientId();
if (clientId != null) {
connectionFactory.setClientID(clientId);
}
else {
if (activationSpec.isDurableSubscription()) {
log.warn("No clientID specified for durable subscription: " + activationSpec);
}
}
ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
// have we configured a redelivery policy
RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
if (redeliveryPolicy != null) {
physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
}
return physicalConnection;
}
/**
* @param info
* @throws JMSException
* @throws URISyntaxException
*/
synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException {
ActiveMQConnectionFactory factory = connectionFactory;
if (factory != null && info.isConnectionFactoryConfigured()) {
factory = factory.copy();
}
else if (factory == null) {
factory = new ActiveMQConnectionFactory();
}
info.configure(factory);
return factory;
}
private String defaultValue(String value, String defaultValue) {
if (value != null)
return value;
return defaultValue;
}
/**
* @see javax.resource.spi.ResourceAdapter#stop()
*/
public void stop() {
while (endpointWorkers.size() > 0) {
ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
}
if (broker != null) {
ServiceSupport.dispose(broker);
broker = null;
}
this.bootstrapContext = null;
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
*/
public BootstrapContext getBootstrapContext() {
return bootstrapContext;
}
/**
* @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
* javax.resource.spi.ActivationSpec)
*/
public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
throws ResourceException {
// spec section 5.3.3
if (!equals(activationSpec.getResourceAdapter())) {
throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
}
if (!(activationSpec instanceof MessageActivationSpec)) {
throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
}
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
(MessageActivationSpec) activationSpec);
// This is weird.. the same endpoint activated twice.. must be a
// container error.
if (endpointWorkers.containsKey(key)) {
throw new IllegalStateException("Endpoint previously activated");
}
ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
endpointWorkers.put(key, worker);
worker.start();
}
/**
* @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
* javax.resource.spi.ActivationSpec)
*/
public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
if (activationSpec instanceof MessageActivationSpec) {
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec) activationSpec);
ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key);
if (worker == null) {
// This is weird.. that endpoint was not activated.. oh well..
// this method
// does not throw exceptions so just return.
return;
}
try {
worker.stop();
} catch (InterruptedException e) {
// We interrupted.. we won't throw an exception but will stop
// waiting for the worker
// to stop.. we tried our best. Keep trying to interrupt the
// thread.
Thread.currentThread().interrupt();
}
}
}
/**
* We only connect to one resource manager per ResourceAdapter instance, so
* any ActivationSpec will return the same XAResource.
*
* @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
*/
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
Connection connection = null;
try {
connection = makeConnection();
if (connection instanceof XAConnection) {
XASession session = ((XAConnection) connection).createXASession();
XAResource xaResource = session.getXAResource();
return new XAResource[] { xaResource };
}
return new XAResource[] {};
} catch (JMSException e) {
throw new ResourceException(e);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
//
}
}
}
// ///////////////////////////////////////////////////////////////////////
//
// Java Bean getters and setters for this ResourceAdapter class.
//
// ///////////////////////////////////////////////////////////////////////
/**
* @return client id
*/
public String getClientid() {
return emptyToNull(info.getClientid());
}
/**
* @return password
*/
public String getPassword() {
return emptyToNull(info.getPassword());
}
/**
* @return server URL
*/
public String getServerUrl() {
return info.getServerUrl();
}
/**
* @return user name
*/
public String getUserName() {
return emptyToNull(info.getUserName());
}
/**
* @param clientid
*/
public void setClientid(String clientid) {
info.setClientid(clientid);
}
/**
* @param password
*/
public void setPassword(String password) {
info.setPassword(password);
}
/**
* @param url
*/
public void setServerUrl(String url) {
info.setServerUrl(url);
}
/**
* @param userid
*/
public void setUserName(String userid) {
info.setUserName(userid);
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
*/
public String getBrokerXmlConfig() {
return brokerXmlConfig;
}
/**
* Sets the <a href="http://activemq.org/Xml+Configuration">XML
* configuration file </a> used to configure the ActiveMQ broker via Spring
* if using embedded mode.
*
* @param brokerXmlConfig
* is the filename which is assumed to be on the classpath unless
* a URL is specified. So a value of <code>foo/bar.xml</code>
* would be assumed to be on the classpath whereas
* <code>file:dir/file.xml</code> would use the file system.
* Any valid URL string is supported.
*/
public void setBrokerXmlConfig(String brokerXmlConfig) {
this.brokerXmlConfig=brokerXmlConfig;
}
/**
* @return durable topic prefetch
*/
public Integer getDurableTopicPrefetch() {
return info.getDurableTopicPrefetch();
}
/**
* @return initial redelivery delay
*/
public Long getInitialRedeliveryDelay() {
return info.getInitialRedeliveryDelay();
}
/**
* @return input stream prefetch
*/
public Integer getInputStreamPrefetch() {
return info.getInputStreamPrefetch();
}
/**
* @return maximum redeliveries
*/
public Integer getMaximumRedeliveries() {
return info.getMaximumRedeliveries();
}
/**
* @return queue browser prefetch
*/
public Integer getQueueBrowserPrefetch() {
return info.getQueueBrowserPrefetch();
}
/**
* @return queue prefetch
*/
public Integer getQueuePrefetch() {
return info.getQueuePrefetch();
}
/**
* @return redelivery backoff multiplier
*/
public Short getRedeliveryBackOffMultiplier() {
return info.getRedeliveryBackOffMultiplier();
}
/**
* @return redelivery use exponential backoff
*/
public Boolean getRedeliveryUseExponentialBackOff() {
return info.getRedeliveryUseExponentialBackOff();
}
/**
* @return topic prefetch
*/
public Integer getTopicPrefetch() {
return info.getTopicPrefetch();
}
/**
* @return use inbound session enabled
*/
public boolean isUseInboundSessionEnabled() {
return info.isUseInboundSessionEnabled();
}
/**
* @param i
*/
public void setAllPrefetchValues(Integer i) {
info.setAllPrefetchValues(i);
}
/**
* @param durableTopicPrefetch
*/
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
info.setDurableTopicPrefetch(durableTopicPrefetch);
}
/**
* @param value
*/
public void setInitialRedeliveryDelay(Long value) {
info.setInitialRedeliveryDelay(value);
}
/**
* @param inputStreamPrefetch
*/
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
info.setInputStreamPrefetch(inputStreamPrefetch);
}
/**
* @param value
*/
public void setMaximumRedeliveries(Integer value) {
info.setMaximumRedeliveries(value);
}
/**
* @param queueBrowserPrefetch
*/
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
}
/**
* @param queuePrefetch
*/
public void setQueuePrefetch(Integer queuePrefetch) {
info.setQueuePrefetch(queuePrefetch);
}
/**
* @param value
*/
public void setRedeliveryBackOffMultiplier(Short value) {
info.setRedeliveryBackOffMultiplier(value);
}
/**
* @param value
*/
public void setRedeliveryUseExponentialBackOff(Boolean value) {
info.setRedeliveryUseExponentialBackOff(value);
}
/**
* @param topicPrefetch
*/
public void setTopicPrefetch(Integer topicPrefetch) {
info.setTopicPrefetch(topicPrefetch);
}
/**
* @return Returns the info.
*/
public ActiveMQConnectionRequestInfo getInfo() {
return info;
}
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MessageResourceAdapter)) {
return false;
}
final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter) o;
if (!info.equals(activeMQResourceAdapter.getInfo())) {
return false;
}
if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig()) ) {
return false;
}
return true;
}
private boolean notEqual(Object o1, Object o2) {
return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
}
/**
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
int result;
result = info.hashCode();
if( brokerXmlConfig !=null ) {
result ^= brokerXmlConfig.hashCode();
}
return result;
}
private String emptyToNull(String value) {
if (value == null || value.length() == 0) {
return null;
}
return value;
}
/**
* @return use inbound session
*/
public Boolean getUseInboundSession() {
return info.getUseInboundSession();
}
/**
* @param useInboundSession
*/
public void setUseInboundSession(Boolean useInboundSession) {
info.setUseInboundSession(useInboundSession);
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getConnectionFactory()
*/
public ActiveMQConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
* This allows a connection factory to be configured and shared between a ResourceAdaptor and outbound messaging.
* Note that setting the connectionFactory will overload many of the properties on this POJO such as the redelivery
* and prefetch policies; the properties on the connectionFactory will be used instead.
*/
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
}