diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java new file mode 100644 index 0000000000..1694c1a721 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java @@ -0,0 +1,722 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.ra; + +import java.beans.IntrospectionException; +import java.beans.PropertyDescriptor; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.resource.ResourceException; +import javax.resource.spi.InvalidPropertyException; +import javax.resource.spi.ResourceAdapter; + +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.selector.SelectorParser; + +/** + * Configures the inbound JMS consumer specification using ActiveMQ + * + * @org.apache.xbean.XBean element="activationSpec" + * + * @version $Revision$ $Date$ + */ +public class ActiveMQActivationSpec implements MessageActivationSpec, Serializable { + + private static final long serialVersionUID = -7153087544100459975L; + + /** Auto-acknowledge constant for acknowledgeMode property **/ + public static final String AUTO_ACKNOWLEDGE_MODE = "Auto-acknowledge"; + /** Dups-ok-acknowledge constant for acknowledgeMode property * */ + public static final String DUPS_OK_ACKNOWLEDGE_MODE = "Dups-ok-acknowledge"; + /** Durable constant for subscriptionDurability property * */ + public static final String DURABLE_SUBSCRIPTION = "Durable"; + /** NonDurable constant for subscriptionDurability property * */ + public static final String NON_DURABLE_SUBSCRIPTION = "NonDurable"; + + /** + * + */ + public static final int INVALID_ACKNOWLEDGE_MODE = -1; + + private transient MessageResourceAdapter resourceAdapter; + private String destinationType; + private String messageSelector; + private String destination; + private String acknowledgeMode = AUTO_ACKNOWLEDGE_MODE; + private String userName; + private String password; + private String clientId; + private String subscriptionName; + private String subscriptionDurability = NON_DURABLE_SUBSCRIPTION; + private String noLocal = "false"; + private String useRAManagedTransaction = "false"; + private String maxSessions="10"; + private String maxMessagesPerSessions="10"; + private String enableBatch = "false"; + private String maxMessagesPerBatch = "10"; + private RedeliveryPolicy redeliveryPolicy; + + + /** + * @see javax.resource.spi.ActivationSpec#validate() + */ + public void validate() throws InvalidPropertyException { + List errorMessages = new ArrayList(); + List propsNotSet = new ArrayList(); + try { + if (!isValidDestination(errorMessages)) + propsNotSet.add(new PropertyDescriptor("destination", ActiveMQActivationSpec.class)); + if (!isValidDestinationType(errorMessages)) + propsNotSet.add(new PropertyDescriptor("destinationType", ActiveMQActivationSpec.class)); + if (!isValidAcknowledgeMode(errorMessages)) + propsNotSet.add(new PropertyDescriptor("acknowledgeMode", ActiveMQActivationSpec.class)); + if (!isValidSubscriptionDurability(errorMessages)) + propsNotSet.add(new PropertyDescriptor("subscriptionDurability", ActiveMQActivationSpec.class)); + if (!isValidClientId(errorMessages)) + propsNotSet.add(new PropertyDescriptor("clientId", ActiveMQActivationSpec.class)); + if (!isValidSubscriptionName(errorMessages)) + propsNotSet.add(new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class)); + if (!isValidMaxMessagesPerSessions(errorMessages)) + propsNotSet.add(new PropertyDescriptor("maxMessagesPerSessions", ActiveMQActivationSpec.class)); + if (!isValidMaxSessions(errorMessages)) + propsNotSet.add(new PropertyDescriptor("maxSessions", ActiveMQActivationSpec.class)); + if (!isValidMessageSelector(errorMessages)) + propsNotSet.add(new PropertyDescriptor("messageSelector", ActiveMQActivationSpec.class)); + if (!isValidNoLocal(errorMessages)) + propsNotSet.add(new PropertyDescriptor("noLocal", ActiveMQActivationSpec.class)); + if (!isValidUseRAManagedTransaction(errorMessages)) + propsNotSet.add(new PropertyDescriptor("useRAManagedTransaction", ActiveMQActivationSpec.class)); + if (!isValidEnableBatch(errorMessages)) + propsNotSet.add(new PropertyDescriptor("enableBatch", ActiveMQActivationSpec.class)); + if (!isValidMaxMessagesPerBatch(errorMessages)) + propsNotSet.add(new PropertyDescriptor("maxMessagesPerBatch", ActiveMQActivationSpec.class)); + + + } catch (IntrospectionException e) { + e.printStackTrace(); + } + + if (propsNotSet.size() > 0) { + StringBuffer b = new StringBuffer(); + b.append("Invalid settings:"); + for (Iterator iter = errorMessages.iterator(); iter.hasNext();) { + b.append(" "); + b.append(iter.next()); + } + InvalidPropertyException e = new InvalidPropertyException(b.toString()); + final PropertyDescriptor[] descriptors = (PropertyDescriptor[]) propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]); + e.setInvalidPropertyDescriptors(descriptors); + throw e; + } + } + + private boolean isValidUseRAManagedTransaction(List errorMessages) { + try { + new Boolean(noLocal); + return true; + } catch (Throwable e) { + // + } + errorMessages.add("noLocal must be set to: true or false."); + return false; + } + + private boolean isValidNoLocal(List errorMessages) { + try { + new Boolean(noLocal); + return true; + } catch (Throwable e) { + // + } + errorMessages.add("noLocal must be set to: true or false."); + return false; + } + + private boolean isValidMessageSelector(List errorMessages) { + try { + if( !isEmpty(messageSelector) ) { + new SelectorParser().parse(messageSelector); + } + return true; + } catch (Throwable e) { + errorMessages.add("messageSelector not set to valid message selector: "+e.getMessage()); + return false; + } + } + + private boolean isValidMaxSessions(List errorMessages) { + try { + if( Integer.parseInt(maxSessions) > 0 ) { + return true; + } + } catch (NumberFormatException e) { + // + } + errorMessages.add("maxSessions must be set to number > 0"); + return false; + } + + private boolean isValidMaxMessagesPerSessions(List errorMessages) { + try { + if( Integer.parseInt(maxMessagesPerSessions) > 0 ) { + return true; + } + } catch (NumberFormatException e) { + // + } + errorMessages.add("maxMessagesPerSessions must be set to number > 0"); + return false; + } + + private boolean isValidMaxMessagesPerBatch(List errorMessages) { + try { + if( Integer.parseInt(maxMessagesPerBatch) > 0 ) { + return true; + } + } catch (NumberFormatException e) { + // + } + errorMessages.add("maxMessagesPerBatch must be set to number > 0"); + return false; + } + + private boolean isValidEnableBatch(List errorMessages) { + try { + new Boolean(enableBatch); + return true; + } catch (Throwable e) { + // + } + errorMessages.add("enableBatch must be set to: true or false"); + return false; + } + + /** + * @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter() + */ + public ResourceAdapter getResourceAdapter() { + return resourceAdapter; + } + + /** + * @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter) + */ + public void setResourceAdapter(ResourceAdapter resourceAdapter) throws ResourceException { + //spec section 5.3.3 + if (this.resourceAdapter != null) { + throw new ResourceException("ResourceAdapter already set"); + } + if (!(resourceAdapter instanceof MessageResourceAdapter)) { + throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName()); + } + this.resourceAdapter = (MessageResourceAdapter) resourceAdapter; + } + + + ///////////////////////////////////////////////////////////////////////// + // + // Java Bean getters and setters for this ActivationSpec class. + // + ///////////////////////////////////////////////////////////////////////// + /** + * @return Returns the destinationType. + */ + public String getDestinationType() { + if (!isEmpty(destinationType)) { + return destinationType; + } + return null; + } + + /** + * @param destinationType The destinationType to set. + */ + public void setDestinationType(String destinationType) { + this.destinationType = destinationType; + } + + /** + * + */ + public String getPassword() { + if (!isEmpty(password)) { + return password; + } + return null; + } + + /** + * + */ + public void setPassword(String password) { + this.password = password; + } + + /** + * + */ + public String getUserName() { + if (!isEmpty(userName)) { + return userName; + } + return null; + } + + /** + * + */ + public void setUserName(String userName) { + this.userName = userName; + } + + /** + * @return Returns the messageSelector. + */ + public String getMessageSelector() { + if (!isEmpty(messageSelector)) { + return messageSelector; + } + return null; + } + + /** + * @param messageSelector The messageSelector to set. + */ + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + /** + * @return Returns the noLocal. + */ + public String getNoLocal() { + return noLocal; + } + + /** + * @param noLocal The noLocal to set. + */ + public void setNoLocal(String noLocal) { + if( noLocal!=null ) { + this.noLocal = noLocal; + } + } + + /** + * + */ + public String getAcknowledgeMode() { + if (!isEmpty(acknowledgeMode)) { + return acknowledgeMode; + } + return null; + } + + /** + * + */ + public void setAcknowledgeMode(String acknowledgeMode) { + this.acknowledgeMode = acknowledgeMode; + } + + /** + * + */ + public String getClientId() { + if (!isEmpty(clientId)) { + return clientId; + } + return null; + } + + /** + * + */ + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** + * + */ + public String getDestination() { + if (!isEmpty(destination)) { + return destination; + } + return null; + } + + /** + * + */ + public void setDestination(String destination) { + this.destination = destination; + } + + /** + * + */ + public String getSubscriptionDurability() { + if (!isEmpty(subscriptionDurability)) { + return subscriptionDurability; + } + return null; + } + + /** + * + */ + public void setSubscriptionDurability(String subscriptionDurability) { + this.subscriptionDurability = subscriptionDurability; + } + + /** + * + */ + public String getSubscriptionName() { + if (!isEmpty(subscriptionName)) { + return subscriptionName; + } + return null; + } + + /** + * + */ + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + private boolean isValidSubscriptionName(List errorMessages) { + if( !isDurableSubscription() ? true : subscriptionName != null && subscriptionName.trim().length() > 0 ) { + return true; + } + errorMessages.add("subscriptionName must be set since durable subscription was requested."); + return false; + } + + private boolean isValidClientId(List errorMessages) { + if( !isDurableSubscription() ? true : clientId != null && clientId.trim().length() > 0 ) { + return true; + } + errorMessages.add("clientId must be set since durable subscription was requested."); + return false; + } + + /** + * + */ + public boolean isDurableSubscription() { + return DURABLE_SUBSCRIPTION.equals(subscriptionDurability); + } + + private boolean isValidSubscriptionDurability(List errorMessages) { + // subscriptionDurability only applies to Topics + if ( DURABLE_SUBSCRIPTION.equals(subscriptionDurability) && + getDestinationType() != null && !Topic.class.getName().equals(getDestinationType())) { + errorMessages.add("subscriptionDurability cannot be set to: "+DURABLE_SUBSCRIPTION+" when destinationType is set to "+ + Queue.class.getName()+" as it is only valid when destinationType is set to "+Topic.class.getName()+"."); + return false; + } + if (NON_DURABLE_SUBSCRIPTION.equals(subscriptionDurability) || DURABLE_SUBSCRIPTION.equals(subscriptionDurability)) + return true; + errorMessages.add("subscriptionDurability must be set to: "+NON_DURABLE_SUBSCRIPTION+" or "+DURABLE_SUBSCRIPTION+"."); + return false; + } + + private boolean isValidAcknowledgeMode(List errorMessages) { + if (AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) || DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode)) + return true; + errorMessages.add("acknowledgeMode must be set to: "+AUTO_ACKNOWLEDGE_MODE+" or "+DUPS_OK_ACKNOWLEDGE_MODE+"."); + return false; + } + + private boolean isValidDestinationType(List errorMessages) { + if (Queue.class.getName().equals(destinationType) || Topic.class.getName().equals(destinationType)) + return true; + errorMessages.add("destinationType must be set to: "+Queue.class.getName()+" or "+Topic.class.getName()+"."); + return false; + } + + private boolean isValidDestination(List errorMessages) { + if(!(destination == null || destination.equals(""))) + return true; + errorMessages.add("destination is a required field and must be set to the destination name."); + return false; + } + + private boolean isEmpty(String value) { + return value == null || "".equals(value.trim()); + } + + /** + * + */ + @Override +public String toString() { + return "ActiveMQActivationSpec{" + + "acknowledgeMode='" + acknowledgeMode + "'" + + ", destinationType='" + destinationType + "'" + + ", messageSelector='" + messageSelector + "'" + + ", destination='" + destination + "'" + + ", clientId='" + clientId + "'" + + ", subscriptionName='" + subscriptionName + "'" + + ", subscriptionDurability='" + subscriptionDurability + "'" + + "}"; + } + + public int getAcknowledgeModeForSession() { + if( AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) { + return Session.AUTO_ACKNOWLEDGE; + } else if( DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) { + return Session.DUPS_OK_ACKNOWLEDGE; + } else { + return INVALID_ACKNOWLEDGE_MODE; + } + } + + /** + * A helper method mostly for use in Dependency Injection containers + * which allows you to customize the destination and destinationType properties + * from a single ActiveMQDestination POJO + */ + public void setActiveMQDestination(ActiveMQDestination destination) { + setDestination(destination.getPhysicalName()); + if (destination instanceof Queue) { + setDestinationType(Queue.class.getName()); + } + else { + setDestinationType(Topic.class.getName()); + } + } + + /** + * + */ + public ActiveMQDestination createDestination() { + if( isEmpty(destinationType) || isEmpty(destination) ) + return null; + + ActiveMQDestination dest = null; + if (Queue.class.getName().equals(destinationType)) { + dest = new ActiveMQQueue(destination); + } else if (Topic.class.getName().equals(destinationType)) { + dest = new ActiveMQTopic(destination); + } else { + assert false : "Execution should never reach here"; + } + return dest; + } + + public String getMaxMessagesPerSessions() { + return maxMessagesPerSessions.toString(); + } + + /** + * + */ + public void setMaxMessagesPerSessions(String maxMessagesPerSessions) { + if( maxMessagesPerSessions!=null ) { + this.maxMessagesPerSessions = maxMessagesPerSessions; + } + } + + /** + * + */ + public String getMaxSessions() { + return maxSessions; + } + + /** + * + */ + public void setMaxSessions(String maxSessions) { + if( maxSessions!=null ) { + this.maxSessions = maxSessions; + } + } + + /** + * + */ + public String getUseRAManagedTransaction() { + return useRAManagedTransaction; + } + + /** + * + */ + public void setUseRAManagedTransaction(String useRAManagedTransaction) { + if( useRAManagedTransaction!=null ) { + this.useRAManagedTransaction = useRAManagedTransaction; + } + } + + /** + * + */ + public int getMaxMessagesPerSessionsIntValue() { + return Integer.parseInt(maxMessagesPerSessions); + } + + /** + * + */ + public int getMaxSessionsIntValue() { + return Integer.parseInt(maxSessions); + } + + public boolean isUseRAManagedTransactionEnabled() { + return new Boolean(useRAManagedTransaction).booleanValue(); + } + + /** + * + */ + public boolean getNoLocalBooleanValue() { + return new Boolean(noLocal).booleanValue(); + } + + public String getEnableBatch() { + return enableBatch; + } + + /** + * + */ + public void setEnableBatch(String enableBatch) { + if (enableBatch != null) { + this.enableBatch = enableBatch; + } + } + + public boolean getEnableBatchBooleanValue() { + return new Boolean(enableBatch).booleanValue(); + } + + public int getMaxMessagesPerBatchIntValue() { + return Integer.parseInt(maxMessagesPerBatch); + } + + public String getMaxMessagesPerBatch() { + return maxMessagesPerBatch.toString(); + } + + /** + * + */ + public void setMaxMessagesPerBatch(String maxMessagesPerBatch) { + if (maxMessagesPerBatch != null) { + this.maxMessagesPerBatch = maxMessagesPerBatch; + } + } + + /** + * + */ + public short getBackOffMultiplier() { + if (redeliveryPolicy == null) { + return 0; + } + return redeliveryPolicy.getBackOffMultiplier(); + } + + /** + * + */ + public long getInitialRedeliveryDelay() { + if (redeliveryPolicy == null) { + return 0; + } + return redeliveryPolicy.getInitialRedeliveryDelay(); + } + + /** + * + */ + public int getMaximumRedeliveries() { + if (redeliveryPolicy == null) { + return 0; + } + return redeliveryPolicy.getMaximumRedeliveries(); + } + + /** + * + */ + public boolean isUseExponentialBackOff() { + if (redeliveryPolicy == null) { + return false; + } + return redeliveryPolicy.isUseExponentialBackOff(); + } + + /** + * + */ + public void setBackOffMultiplier(short backOffMultiplier) { + lazyCreateRedeliveryPolicy().setBackOffMultiplier(backOffMultiplier); + } + + /** + * + */ + public void setInitialRedeliveryDelay(long initialRedeliveryDelay) { + lazyCreateRedeliveryPolicy().setInitialRedeliveryDelay(initialRedeliveryDelay); + } + + /** + * + */ + public void setMaximumRedeliveries(int maximumRedeliveries) { + lazyCreateRedeliveryPolicy().setMaximumRedeliveries(maximumRedeliveries); + } + + /** + * + */ + public void setUseExponentialBackOff(boolean useExponentialBackOff) { + lazyCreateRedeliveryPolicy().setUseExponentialBackOff(useExponentialBackOff); + } + + // don't use getter to avoid causing introspection errors in containers + /** + * + */ + public RedeliveryPolicy redeliveryPolicy() { + return redeliveryPolicy; + } + + protected RedeliveryPolicy lazyCreateRedeliveryPolicy() { + if (redeliveryPolicy == null) { + redeliveryPolicy = new RedeliveryPolicy(); + } + return redeliveryPolicy; + } +} + diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java new file mode 100644 index 0000000000..04c76cefdd --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -0,0 +1,593 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.ra; + +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.XAConnection; +import javax.jms.XASession; +import javax.resource.NotSupportedException; +import javax.resource.ResourceException; +import javax.resource.spi.ActivationSpec; +import javax.resource.spi.BootstrapContext; +import javax.resource.spi.ResourceAdapterInternalException; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.transaction.xa.XAResource; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.ServiceSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Knows how to connect to one ActiveMQ server. It can then activate endpoints + * and deliver messages to those end points using the connection configure in the + * resource adapter.

Must override equals and hashCode (JCA spec 16.4) + * + * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" + * description="The JCA Resource Adaptor for ActiveMQ" + * + * @version $Revision$ + */ +public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable { + + private static final long serialVersionUID = -5417363537865649130L; + private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class); + + private final HashMap endpointWorkers = new HashMap(); + private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); + + private BootstrapContext bootstrapContext; + private String brokerXmlConfig; + private BrokerService broker; + private ActiveMQConnectionFactory connectionFactory; + + /** + * + */ + public ActiveMQResourceAdapter() { + super(); + } + + /** + * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) + */ + public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { + this.bootstrapContext = bootstrapContext; + if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) { + try { + broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); + broker.start(); + } catch (Throwable e) { + throw new ResourceAdapterInternalException("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e); + } + } + } + + /** + * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection() + */ + public ActiveMQConnection makeConnection() throws JMSException { + if (connectionFactory != null) { + return makeConnection(info, connectionFactory); + } + return makeConnection(info); + } + + /** + */ + public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException { + + ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); + return makeConnection(info, connectionFactory); + } + + /** + * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection(org.apache.activemq.ra.ActiveMQConnectionRequestInfo, org.apache.activemq.ActiveMQConnectionFactory) + */ + public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException { + String userName = info.getUserName(); + String password = info.getPassword(); + ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); + + String clientId = info.getClientid(); + if (clientId != null && clientId.length() > 0) { + physicalConnection.setClientID(clientId); + } + return physicalConnection; + } + + /** + * @param activationSpec + */ + public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { + ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); + String userName = defaultValue(activationSpec.getUserName(), info.getUserName()); + String password = defaultValue(activationSpec.getPassword(), info.getPassword()); + String clientId = activationSpec.getClientId(); + if (clientId != null) { + connectionFactory.setClientID(clientId); + } + else { + if (activationSpec.isDurableSubscription()) { + log.warn("No clientID specified for durable subscription: " + activationSpec); + } + } + ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); + + // have we configured a redelivery policy + RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); + if (redeliveryPolicy != null) { + physicalConnection.setRedeliveryPolicy(redeliveryPolicy); + } + return physicalConnection; + } + + /** + * @param info + * @throws JMSException + * @throws URISyntaxException + */ + synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException { + ActiveMQConnectionFactory factory = connectionFactory; + if (factory != null && info.isConnectionFactoryConfigured()) { + factory = factory.copy(); + } + else if (factory == null) { + factory = new ActiveMQConnectionFactory(); + } + info.configure(factory); + return factory; + } + + private String defaultValue(String value, String defaultValue) { + if (value != null) + return value; + return defaultValue; + } + + /** + * @see javax.resource.spi.ResourceAdapter#stop() + */ + public void stop() { + while (endpointWorkers.size() > 0) { + ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next(); + endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); + } + if (broker != null) { + ServiceSupport.dispose(broker); + broker = null; + } + this.bootstrapContext = null; + } + + /** + * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext() + */ + public BootstrapContext getBootstrapContext() { + return bootstrapContext; + } + + /** + * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, + * javax.resource.spi.ActivationSpec) + */ + public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) + throws ResourceException { + + // spec section 5.3.3 + if (!equals(activationSpec.getResourceAdapter())) { + throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); + } + + if (!(activationSpec instanceof MessageActivationSpec)) { + throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass()); + } + + ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, + (MessageActivationSpec) activationSpec); + // This is weird.. the same endpoint activated twice.. must be a + // container error. + if (endpointWorkers.containsKey(key)) { + throw new IllegalStateException("Endpoint previously activated"); + } + + ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); + + endpointWorkers.put(key, worker); + worker.start(); + } + + /** + * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, + * javax.resource.spi.ActivationSpec) + */ + public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { + + if (activationSpec instanceof MessageActivationSpec) { + ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec) activationSpec); + ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key); + if (worker == null) { + // This is weird.. that endpoint was not activated.. oh well.. + // this method + // does not throw exceptions so just return. + return; + } + try { + worker.stop(); + } catch (InterruptedException e) { + // We interrupted.. we won't throw an exception but will stop + // waiting for the worker + // to stop.. we tried our best. Keep trying to interrupt the + // thread. + Thread.currentThread().interrupt(); + } + + } + + } + + /** + * We only connect to one resource manager per ResourceAdapter instance, so + * any ActivationSpec will return the same XAResource. + * + * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) + */ + public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { + Connection connection = null; + try { + connection = makeConnection(); + if (connection instanceof XAConnection) { + XASession session = ((XAConnection) connection).createXASession(); + XAResource xaResource = session.getXAResource(); + return new XAResource[] { xaResource }; + } + return new XAResource[] {}; + } catch (JMSException e) { + throw new ResourceException(e); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + // + } + } + } + + // /////////////////////////////////////////////////////////////////////// + // + // Java Bean getters and setters for this ResourceAdapter class. + // + // /////////////////////////////////////////////////////////////////////// + + /** + * @return client id + */ + public String getClientid() { + return emptyToNull(info.getClientid()); + } + + /** + * @return password + */ + public String getPassword() { + return emptyToNull(info.getPassword()); + } + + /** + * @return server URL + */ + public String getServerUrl() { + return info.getServerUrl(); + } + + /** + * @return user name + */ + public String getUserName() { + return emptyToNull(info.getUserName()); + } + + /** + * @param clientid + */ + public void setClientid(String clientid) { + info.setClientid(clientid); + } + + /** + * @param password + */ + public void setPassword(String password) { + info.setPassword(password); + } + + /** + * @param url + */ + public void setServerUrl(String url) { + info.setServerUrl(url); + } + + /** + * @param userid + */ + public void setUserName(String userid) { + info.setUserName(userid); + } + + /** + * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig() + */ + public String getBrokerXmlConfig() { + return brokerXmlConfig; + } + + /** + * Sets the XML + * configuration file used to configure the ActiveMQ broker via Spring + * if using embedded mode. + * + * @param brokerXmlConfig + * is the filename which is assumed to be on the classpath unless + * a URL is specified. So a value of foo/bar.xml + * would be assumed to be on the classpath whereas + * file:dir/file.xml would use the file system. + * Any valid URL string is supported. + */ + public void setBrokerXmlConfig(String brokerXmlConfig) { + this.brokerXmlConfig=brokerXmlConfig; + } + + /** + * @return durable topic prefetch + */ + public Integer getDurableTopicPrefetch() { + return info.getDurableTopicPrefetch(); + } + + /** + * @return initial redelivery delay + */ + public Long getInitialRedeliveryDelay() { + return info.getInitialRedeliveryDelay(); + } + + /** + * @return input stream prefetch + */ + public Integer getInputStreamPrefetch() { + return info.getInputStreamPrefetch(); + } + + /** + * @return maximum redeliveries + */ + public Integer getMaximumRedeliveries() { + return info.getMaximumRedeliveries(); + } + + /** + * @return queue browser prefetch + */ + public Integer getQueueBrowserPrefetch() { + return info.getQueueBrowserPrefetch(); + } + + /** + * @return queue prefetch + */ + public Integer getQueuePrefetch() { + return info.getQueuePrefetch(); + } + + /** + * @return redelivery backoff multiplier + */ + public Short getRedeliveryBackOffMultiplier() { + return info.getRedeliveryBackOffMultiplier(); + } + + /** + * @return redelivery use exponential backoff + */ + public Boolean getRedeliveryUseExponentialBackOff() { + return info.getRedeliveryUseExponentialBackOff(); + } + + /** + * @return topic prefetch + */ + public Integer getTopicPrefetch() { + return info.getTopicPrefetch(); + } + + /** + * @return use inbound session enabled + */ + public boolean isUseInboundSessionEnabled() { + return info.isUseInboundSessionEnabled(); + } + + /** + * @param i + */ + public void setAllPrefetchValues(Integer i) { + info.setAllPrefetchValues(i); + } + + /** + * @param durableTopicPrefetch + */ + public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { + info.setDurableTopicPrefetch(durableTopicPrefetch); + } + + /** + * @param value + */ + public void setInitialRedeliveryDelay(Long value) { + info.setInitialRedeliveryDelay(value); + } + + /** + * @param inputStreamPrefetch + */ + public void setInputStreamPrefetch(Integer inputStreamPrefetch) { + info.setInputStreamPrefetch(inputStreamPrefetch); + } + + /** + * @param value + */ + public void setMaximumRedeliveries(Integer value) { + info.setMaximumRedeliveries(value); + } + + /** + * @param queueBrowserPrefetch + */ + public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { + info.setQueueBrowserPrefetch(queueBrowserPrefetch); + } + + /** + * @param queuePrefetch + */ + public void setQueuePrefetch(Integer queuePrefetch) { + info.setQueuePrefetch(queuePrefetch); + } + + /** + * @param value + */ + public void setRedeliveryBackOffMultiplier(Short value) { + info.setRedeliveryBackOffMultiplier(value); + } + + /** + * @param value + */ + public void setRedeliveryUseExponentialBackOff(Boolean value) { + info.setRedeliveryUseExponentialBackOff(value); + } + + /** + * @param topicPrefetch + */ + public void setTopicPrefetch(Integer topicPrefetch) { + info.setTopicPrefetch(topicPrefetch); + } + + /** + * @return Returns the info. + */ + public ActiveMQConnectionRequestInfo getInfo() { + return info; + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MessageResourceAdapter)) { + return false; + } + + final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter) o; + + if (!info.equals(activeMQResourceAdapter.getInfo())) { + return false; + } + if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig()) ) { + return false; + } + + return true; + } + + private boolean notEqual(Object o1, Object o2) { + return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2)); + } + + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + int result; + result = info.hashCode(); + if( brokerXmlConfig !=null ) { + result ^= brokerXmlConfig.hashCode(); + } + return result; + } + + private String emptyToNull(String value) { + if (value == null || value.length() == 0) { + return null; + } + return value; + } + + /** + * @return use inbound session + */ + public Boolean getUseInboundSession() { + return info.getUseInboundSession(); + } + + /** + * @param useInboundSession + */ + public void setUseInboundSession(Boolean useInboundSession) { + info.setUseInboundSession(useInboundSession); + } + + /** + * @see org.apache.activemq.ra.MessageResourceAdapter#getConnectionFactory() + */ + public ActiveMQConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + /** + * This allows a connection factory to be configured and shared between a ResourceAdaptor and outbound messaging. + * Note that setting the connectionFactory will overload many of the properties on this POJO such as the redelivery + * and prefetch policies; the properties on the connectionFactory will be used instead. + */ + public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + +}