From d7355e787471d37842f0ffbca3c4251cb69ffd10 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Wed, 7 Mar 2007 14:10:33 +0000 Subject: [PATCH] applied modified version of AMQ-1147 (keeping the implementation classes the same name to avoid issues with existing RA configurations) git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515574 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ra/ActiveMQActivationSpec.java | 605 ------------------ .../ra/ActiveMQEndpointActivationKey.java | 6 +- .../activemq/ra/ActiveMQEndpointWorker.java | 26 +- .../ra/ActiveMQManagedConnectionFactory.java | 108 +++- .../activemq/ra/ActiveMQResourceAdapter.java | 511 --------------- .../activemq/ra/MessageActivationSpec.java | 139 ++++ .../activemq/ra/MessageResourceAdapter.java | 80 +++ .../activemq/ra/ServerSessionPoolImpl.java | 6 +- 8 files changed, 351 insertions(+), 1130 deletions(-) delete mode 100755 activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java delete mode 100755 activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java create mode 100755 activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java create mode 100755 activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java deleted file mode 100755 index 4ddec1f24b..0000000000 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java +++ /dev/null @@ -1,605 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.ra; - -import java.beans.IntrospectionException; -import java.beans.PropertyDescriptor; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.resource.ResourceException; -import javax.resource.spi.ActivationSpec; -import javax.resource.spi.InvalidPropertyException; -import javax.resource.spi.ResourceAdapter; - -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.selector.SelectorParser; - -/** - * Configures the inbound JMS consumer specification using ActiveMQ - * - * @org.apache.xbean.XBean element="activationSpec" - * - * @version $Revision$ $Date$ - */ -public class ActiveMQActivationSpec implements ActivationSpec, Serializable { - - private static final long serialVersionUID = -7153087544100459975L; - - /** Auto-acknowledge constant for acknowledgeMode property **/ - public static final String AUTO_ACKNOWLEDGE_MODE = "Auto-acknowledge"; - /** Dups-ok-acknowledge constant for acknowledgeMode property * */ - public static final String DUPS_OK_ACKNOWLEDGE_MODE = "Dups-ok-acknowledge"; - /** Durable constant for subscriptionDurability property * */ - public static final String DURABLE_SUBSCRIPTION = "Durable"; - /** NonDurable constant for subscriptionDurability property * */ - public static final String NON_DURABLE_SUBSCRIPTION = "NonDurable"; - - public static final int INVALID_ACKNOWLEDGE_MODE = -1; - - private transient ActiveMQResourceAdapter resourceAdapter; - private String destinationType; - private String messageSelector; - private String destination; - private String acknowledgeMode = AUTO_ACKNOWLEDGE_MODE; - private String userName; - private String password; - private String clientId; - private String subscriptionName; - private String subscriptionDurability = NON_DURABLE_SUBSCRIPTION; - private String noLocal = "false"; - private String useRAManagedTransaction = "false"; - private String maxSessions="10"; - private String maxMessagesPerSessions="10"; - private String enableBatch = "false"; - private String maxMessagesPerBatch = "10"; - private RedeliveryPolicy redeliveryPolicy; - - - /** - * @see javax.resource.spi.ActivationSpec#validate() - */ - public void validate() throws InvalidPropertyException { - List errorMessages = new ArrayList(); - List propsNotSet = new ArrayList(); - try { - if (!isValidDestination(errorMessages)) - propsNotSet.add(new PropertyDescriptor("destination", ActiveMQActivationSpec.class)); - if (!isValidDestinationType(errorMessages)) - propsNotSet.add(new PropertyDescriptor("destinationType", ActiveMQActivationSpec.class)); - if (!isValidAcknowledgeMode(errorMessages)) - propsNotSet.add(new PropertyDescriptor("acknowledgeMode", ActiveMQActivationSpec.class)); - if (!isValidSubscriptionDurability(errorMessages)) - propsNotSet.add(new PropertyDescriptor("subscriptionDurability", ActiveMQActivationSpec.class)); - if (!isValidClientId(errorMessages)) - propsNotSet.add(new PropertyDescriptor("clientId", ActiveMQActivationSpec.class)); - if (!isValidSubscriptionName(errorMessages)) - propsNotSet.add(new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class)); - if (!isValidMaxMessagesPerSessions(errorMessages)) - propsNotSet.add(new PropertyDescriptor("maxMessagesPerSessions", ActiveMQActivationSpec.class)); - if (!isValidMaxSessions(errorMessages)) - propsNotSet.add(new PropertyDescriptor("maxSessions", ActiveMQActivationSpec.class)); - if (!isValidMessageSelector(errorMessages)) - propsNotSet.add(new PropertyDescriptor("messageSelector", ActiveMQActivationSpec.class)); - if (!isValidNoLocal(errorMessages)) - propsNotSet.add(new PropertyDescriptor("noLocal", ActiveMQActivationSpec.class)); - if (!isValidUseRAManagedTransaction(errorMessages)) - propsNotSet.add(new PropertyDescriptor("useRAManagedTransaction", ActiveMQActivationSpec.class)); - if (!isValidEnableBatch(errorMessages)) - propsNotSet.add(new PropertyDescriptor("enableBatch", ActiveMQActivationSpec.class)); - if (!isValidMaxMessagesPerBatch(errorMessages)) - propsNotSet.add(new PropertyDescriptor("maxMessagesPerBatch", ActiveMQActivationSpec.class)); - - - } catch (IntrospectionException e) { - e.printStackTrace(); - } - - if (propsNotSet.size() > 0) { - StringBuffer b = new StringBuffer(); - b.append("Invalid settings:"); - for (Iterator iter = errorMessages.iterator(); iter.hasNext();) { - b.append(" "); - b.append(iter.next()); - } - InvalidPropertyException e = new InvalidPropertyException(b.toString()); - final PropertyDescriptor[] descriptors = (PropertyDescriptor[]) propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]); - e.setInvalidPropertyDescriptors(descriptors); - throw e; - } - } - - private boolean isValidUseRAManagedTransaction(List errorMessages) { - try { - new Boolean(noLocal); - return true; - } catch (Throwable e) { - } - errorMessages.add("noLocal must be set to: true or false."); - return false; - } - - private boolean isValidNoLocal(List errorMessages) { - try { - new Boolean(noLocal); - return true; - } catch (Throwable e) { - } - errorMessages.add("noLocal must be set to: true or false."); - return false; - } - - private boolean isValidMessageSelector(List errorMessages) { - try { - if( !isEmpty(messageSelector) ) { - new SelectorParser().parse(messageSelector); - } - return true; - } catch (Throwable e) { - errorMessages.add("messageSelector not set to valid message selector: "+e.getMessage()); - return false; - } - } - - private boolean isValidMaxSessions(List errorMessages) { - try { - if( Integer.parseInt(maxSessions) > 0 ) { - return true; - } - } catch (NumberFormatException e) { - } - errorMessages.add("maxSessions must be set to number > 0"); - return false; - } - - private boolean isValidMaxMessagesPerSessions(List errorMessages) { - try { - if( Integer.parseInt(maxMessagesPerSessions) > 0 ) { - return true; - } - } catch (NumberFormatException e) { - } - errorMessages.add("maxMessagesPerSessions must be set to number > 0"); - return false; - } - - private boolean isValidMaxMessagesPerBatch(List errorMessages) { - try { - if( Integer.parseInt(maxMessagesPerBatch) > 0 ) { - return true; - } - } catch (NumberFormatException e) { - } - errorMessages.add("maxMessagesPerBatch must be set to number > 0"); - return false; - } - - private boolean isValidEnableBatch(List errorMessages) { - try { - new Boolean(enableBatch); - return true; - } catch (Throwable e) { - } - errorMessages.add("enableBatch must be set to: true or false"); - return false; - } - - /** - * @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter() - */ - public ResourceAdapter getResourceAdapter() { - return resourceAdapter; - } - - /** - * @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter) - */ - public void setResourceAdapter(ResourceAdapter resourceAdapter) throws ResourceException { - //spec section 5.3.3 - if (this.resourceAdapter != null) { - throw new ResourceException("ResourceAdapter already set"); - } - if (!(resourceAdapter instanceof ActiveMQResourceAdapter)) { - throw new ResourceException("ResourceAdapter is not of type: " + ActiveMQResourceAdapter.class.getName()); - } - this.resourceAdapter = (ActiveMQResourceAdapter) resourceAdapter; - } - - - ///////////////////////////////////////////////////////////////////////// - // - // Java Bean getters and setters for this ActivationSpec class. - // - ///////////////////////////////////////////////////////////////////////// - /** - * @return Returns the destinationType. - */ - public String getDestinationType() { - if (!isEmpty(destinationType)) { - return destinationType; - } - return null; - } - - /** - * @param destinationType The destinationType to set. - */ - public void setDestinationType(String destinationType) { - this.destinationType = destinationType; - } - - public String getPassword() { - if (!isEmpty(password)) { - return password; - } - return null; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getUserName() { - if (!isEmpty(userName)) { - return userName; - } - return null; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - /** - * @return Returns the messageSelector. - */ - public String getMessageSelector() { - if (!isEmpty(messageSelector)) { - return messageSelector; - } - return null; - } - - /** - * @param messageSelector The messageSelector to set. - */ - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - /** - * @return Returns the noLocal. - */ - public String getNoLocal() { - return noLocal; - } - - /** - * @param noLocal The noLocal to set. - */ - public void setNoLocal(String noLocal) { - if( noLocal!=null ) { - this.noLocal = noLocal; - } - } - - public String getAcknowledgeMode() { - if (!isEmpty(acknowledgeMode)) { - return acknowledgeMode; - } - return null; - } - - public void setAcknowledgeMode(String acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - public String getClientId() { - if (!isEmpty(clientId)) { - return clientId; - } - return null; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public String getDestination() { - if (!isEmpty(destination)) { - return destination; - } - return null; - } - - public void setDestination(String destination) { - this.destination = destination; - } - - public String getSubscriptionDurability() { - if (!isEmpty(subscriptionDurability)) { - return subscriptionDurability; - } - return null; - } - - public void setSubscriptionDurability(String subscriptionDurability) { - this.subscriptionDurability = subscriptionDurability; - } - - public String getSubscriptionName() { - if (!isEmpty(subscriptionName)) { - return subscriptionName; - } - return null; - } - - public void setSubscriptionName(String subscriptionName) { - this.subscriptionName = subscriptionName; - } - - private boolean isValidSubscriptionName(List errorMessages) { - if( !isDurableSubscription() ? true : subscriptionName != null && subscriptionName.trim().length() > 0 ) { - return true; - } - errorMessages.add("subscriptionName must be set since durable subscription was requested."); - return false; - } - - private boolean isValidClientId(List errorMessages) { - if( !isDurableSubscription() ? true : clientId != null && clientId.trim().length() > 0 ) { - return true; - } - errorMessages.add("clientId must be set since durable subscription was requested."); - return false; - } - - public boolean isDurableSubscription() { - return DURABLE_SUBSCRIPTION.equals(subscriptionDurability); - } - - private boolean isValidSubscriptionDurability(List errorMessages) { - // subscriptionDurability only applies to Topics - if ( DURABLE_SUBSCRIPTION.equals(subscriptionDurability) && - getDestinationType() != null && !Topic.class.getName().equals(getDestinationType())) { - errorMessages.add("subscriptionDurability cannot be set to: "+DURABLE_SUBSCRIPTION+" when destinationType is set to "+ - Queue.class.getName()+" as it is only valid when destinationType is set to "+Topic.class.getName()+"."); - return false; - } - if (NON_DURABLE_SUBSCRIPTION.equals(subscriptionDurability) || DURABLE_SUBSCRIPTION.equals(subscriptionDurability)) - return true; - errorMessages.add("subscriptionDurability must be set to: "+NON_DURABLE_SUBSCRIPTION+" or "+DURABLE_SUBSCRIPTION+"."); - return false; - } - - private boolean isValidAcknowledgeMode(List errorMessages) { - if (AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) || DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode)) - return true; - errorMessages.add("acknowledgeMode must be set to: "+AUTO_ACKNOWLEDGE_MODE+" or "+DUPS_OK_ACKNOWLEDGE_MODE+"."); - return false; - } - - private boolean isValidDestinationType(List errorMessages) { - if (Queue.class.getName().equals(destinationType) || Topic.class.getName().equals(destinationType)) - return true; - errorMessages.add("destinationType must be set to: "+Queue.class.getName()+" or "+Topic.class.getName()+"."); - return false; - } - - private boolean isValidDestination(List errorMessages) { - if(!(destination == null || destination.equals(""))) - return true; - errorMessages.add("destination is a required field and must be set to the destination name."); - return false; - } - - private boolean isEmpty(String value) { - return value == null || "".equals(value.trim()); - } - - public String toString() { - return "ActiveMQActivationSpec{" + - "acknowledgeMode='" + acknowledgeMode + "'" + - ", destinationType='" + destinationType + "'" + - ", messageSelector='" + messageSelector + "'" + - ", destination='" + destination + "'" + - ", clientId='" + clientId + "'" + - ", subscriptionName='" + subscriptionName + "'" + - ", subscriptionDurability='" + subscriptionDurability + "'" + - "}"; - } - - public int getAcknowledgeModeForSession() { - if( AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) { - return Session.AUTO_ACKNOWLEDGE; - } else if( DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) { - return Session.DUPS_OK_ACKNOWLEDGE; - } else { - return INVALID_ACKNOWLEDGE_MODE; - } - } - - /** - * A helper method mostly for use in Dependency Injection containers - * which allows you to customize the destination and destinationType properties - * from a single ActiveMQDestination POJO - */ - public void setActiveMQDestination(ActiveMQDestination destination) { - setDestination(destination.getPhysicalName()); - if (destination instanceof Queue) { - setDestinationType(Queue.class.getName()); - } - else { - setDestinationType(Topic.class.getName()); - } - } - - public ActiveMQDestination createDestination() { - if( isEmpty(destinationType) || isEmpty(destination) ) - return null; - - ActiveMQDestination dest = null; - if (Queue.class.getName().equals(destinationType)) { - dest = new ActiveMQQueue(destination); - } else if (Topic.class.getName().equals(destinationType)) { - dest = new ActiveMQTopic(destination); - } else { - assert false : "Execution should never reach here"; - } - return dest; - } - - public String getMaxMessagesPerSessions() { - return maxMessagesPerSessions.toString(); - } - - public void setMaxMessagesPerSessions(String maxMessagesPerSessions) { - if( maxMessagesPerSessions!=null ) { - this.maxMessagesPerSessions = maxMessagesPerSessions; - } - } - - - public String getMaxSessions() { - return maxSessions; - } - public void setMaxSessions(String maxSessions) { - if( maxSessions!=null ) { - this.maxSessions = maxSessions; - } - } - - public String getUseRAManagedTransaction() { - return useRAManagedTransaction; - } - - public void setUseRAManagedTransaction(String useRAManagedTransaction) { - if( useRAManagedTransaction!=null ) { - this.useRAManagedTransaction = useRAManagedTransaction; - } - } - - public int getMaxMessagesPerSessionsIntValue() { - return Integer.parseInt(maxMessagesPerSessions); - } - - public int getMaxSessionsIntValue() { - return Integer.parseInt(maxSessions); - } - - public boolean isUseRAManagedTransactionEnabled() { - return new Boolean(useRAManagedTransaction).booleanValue(); - } - - public boolean getNoLocalBooleanValue() { - return new Boolean(noLocal).booleanValue(); - } - - public String getEnableBatch() { - return enableBatch; - } - - public void setEnableBatch(String enableBatch) { - if (enableBatch != null) { - this.enableBatch = enableBatch; - } - } - - public boolean getEnableBatchBooleanValue() { - return new Boolean(enableBatch).booleanValue(); - } - - public int getMaxMessagesPerBatchIntValue() { - return Integer.parseInt(maxMessagesPerBatch); - } - - public String getMaxMessagesPerBatch() { - return maxMessagesPerBatch.toString(); - } - - public void setMaxMessagesPerBatch(String maxMessagesPerBatch) { - if (maxMessagesPerBatch != null) { - this.maxMessagesPerBatch = maxMessagesPerBatch; - } - } - - public short getBackOffMultiplier() { - if (redeliveryPolicy == null) { - return 0; - } - return redeliveryPolicy.getBackOffMultiplier(); - } - - public long getInitialRedeliveryDelay() { - if (redeliveryPolicy == null) { - return 0; - } - return redeliveryPolicy.getInitialRedeliveryDelay(); - } - - public int getMaximumRedeliveries() { - if (redeliveryPolicy == null) { - return 0; - } - return redeliveryPolicy.getMaximumRedeliveries(); - } - - public boolean isUseExponentialBackOff() { - if (redeliveryPolicy == null) { - return false; - } - return redeliveryPolicy.isUseExponentialBackOff(); - } - - public void setBackOffMultiplier(short backOffMultiplier) { - lazyCreateRedeliveryPolicy().setBackOffMultiplier(backOffMultiplier); - } - - public void setInitialRedeliveryDelay(long initialRedeliveryDelay) { - lazyCreateRedeliveryPolicy().setInitialRedeliveryDelay(initialRedeliveryDelay); - } - - public void setMaximumRedeliveries(int maximumRedeliveries) { - lazyCreateRedeliveryPolicy().setMaximumRedeliveries(maximumRedeliveries); - } - - public void setUseExponentialBackOff(boolean useExponentialBackOff) { - lazyCreateRedeliveryPolicy().setUseExponentialBackOff(useExponentialBackOff); - } - - // don't use getter to avoid causing introspection errors in containers - public RedeliveryPolicy redeliveryPolicy() { - return redeliveryPolicy; - } - - protected RedeliveryPolicy lazyCreateRedeliveryPolicy() { - if (redeliveryPolicy == null) { - redeliveryPolicy = new RedeliveryPolicy(); - } - return redeliveryPolicy; - } -} - diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java index b77676ebfc..647be67816 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java @@ -22,12 +22,12 @@ import javax.resource.spi.endpoint.MessageEndpointFactory; public class ActiveMQEndpointActivationKey { final private MessageEndpointFactory messageEndpointFactory; - final private ActiveMQActivationSpec activationSpec; + final private MessageActivationSpec activationSpec; /** * @return Returns the activationSpec. */ - public ActiveMQActivationSpec getActivationSpec() { + public MessageActivationSpec getActivationSpec() { return activationSpec; } @@ -49,7 +49,7 @@ public class ActiveMQEndpointActivationKey { * @param messageEndpointFactory * @param activationSpec */ - public ActiveMQEndpointActivationKey(MessageEndpointFactory messageEndpointFactory, ActiveMQActivationSpec activationSpec) { + public ActiveMQEndpointActivationKey(MessageEndpointFactory messageEndpointFactory, MessageActivationSpec activationSpec) { this.messageEndpointFactory = messageEndpointFactory; this.activationSpec = activationSpec; } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java index a101ed004e..4cdee672a8 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java @@ -46,6 +46,10 @@ import org.apache.commons.logging.LogFactory; public class ActiveMQEndpointWorker { private static final Log log = LogFactory.getLog(ActiveMQEndpointWorker.class); + + /** + * + */ public static final Method ON_MESSAGE_METHOD; private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second. @@ -61,7 +65,7 @@ public class ActiveMQEndpointWorker { } } - protected ActiveMQResourceAdapter adapter; + protected MessageResourceAdapter adapter; protected ActiveMQEndpointActivationKey endpointActivationKey; protected MessageEndpointFactory endpointFactory; protected WorkManager workManager; @@ -88,6 +92,7 @@ public class ActiveMQEndpointWorker { } } catch (JMSException e) { + // } } @@ -101,6 +106,7 @@ public class ActiveMQEndpointWorker { } } catch (JMSException e) { + // } } @@ -114,10 +120,14 @@ public class ActiveMQEndpointWorker { } } catch (JMSException e) { + // } } - public ActiveMQEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { + /** + * + */ + public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { this.endpointActivationKey = key; this.adapter = adapter; this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); @@ -132,6 +142,7 @@ public class ActiveMQEndpointWorker { connectWork = new Work() { public void release() { + // } synchronized public void run() { @@ -140,7 +151,7 @@ public class ActiveMQEndpointWorker { if( connection!=null ) return; - ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); + MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); try { connection = adapter.makeConnection(activationSpec); connection.start(); @@ -176,7 +187,7 @@ public class ActiveMQEndpointWorker { } }; - ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); + MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { dest = new ActiveMQQueue(activationSpec.getDestination()); } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { @@ -187,6 +198,9 @@ public class ActiveMQEndpointWorker { } + /** + * + */ synchronized public void start() throws WorkException, ResourceException { if (running) return; @@ -257,7 +271,9 @@ public class ActiveMQEndpointWorker { this.reconnectDelay=MAX_RECONNECT_DELAY; } connect(); - } catch(InterruptedException e) {} + } catch(InterruptedException e) { + // + } } protected void registerThreadSession(Session session) { diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java index f85b3e79b9..8cbad0050d 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java @@ -42,12 +42,18 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor private static final long serialVersionUID = 6196921962230582875L; - private ActiveMQResourceAdapter adapter; + private MessageResourceAdapter adapter; private PrintWriter logWriter; private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); + /** + * @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter) + */ public void setResourceAdapter(ResourceAdapter adapter) throws ResourceException { - this.adapter = (ActiveMQResourceAdapter) adapter; + if (!(adapter instanceof MessageResourceAdapter)) { + throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName()); + } + this.adapter = (MessageResourceAdapter) adapter; ActiveMQConnectionRequestInfo baseInfo = this.adapter.getInfo().copy(); if (info.getClientid() == null) info.setClientid(baseInfo.getClientid()); @@ -61,6 +67,10 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor info.setUserName(baseInfo.getUserName()); } + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override public boolean equals(Object object) { if( object == null || object.getClass()!=ActiveMQManagedConnectionFactory.class ) { return false; @@ -68,11 +78,18 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor return ((ActiveMQManagedConnectionFactory)object).info.equals(info); } + /** + * @see java.lang.Object#hashCode() + */ + @Override public int hashCode() { return info.hashCode(); } + /** + * @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter() + */ public ResourceAdapter getResourceAdapter() { return adapter; } @@ -155,117 +172,204 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor // // ///////////////////////////////////////////////////////////////////////// + /** + * + */ public String getClientid() { return info.getClientid(); } + /** + * + */ public String getPassword() { return info.getPassword(); } + /** + * + */ public String getUserName() { return info.getUserName(); } + /** + * + */ public void setClientid(String clientid) { info.setClientid(clientid); } + /** + * + */ public void setPassword(String password) { info.setPassword(password); } + /** + * + */ public void setUserName(String userid) { info.setUserName(userid); } + /** + * + */ + /** + * + */ public Boolean getUseInboundSession() { return info.getUseInboundSession(); } + /** + * + */ public void setUseInboundSession(Boolean useInboundSession) { info.setUseInboundSession(useInboundSession); } + /** + * + */ public boolean isUseInboundSessionEnabled() { return info.isUseInboundSessionEnabled(); } // Redelivery policy configuration + /** + * + */ public Long getInitialRedeliveryDelay() { return info.getInitialRedeliveryDelay(); } + /** + * + */ public Integer getMaximumRedeliveries() { return info.getMaximumRedeliveries(); } + /** + * + */ public Short getRedeliveryBackOffMultiplier() { return info.getRedeliveryBackOffMultiplier(); } + /** + * + */ public Boolean getRedeliveryUseExponentialBackOff() { return info.getRedeliveryUseExponentialBackOff(); } + /** + * + */ public void setInitialRedeliveryDelay(Long value) { info.setInitialRedeliveryDelay(value); } + /** + * + */ public void setMaximumRedeliveries(Integer value) { info.setMaximumRedeliveries(value); } + /** + * + */ public void setRedeliveryBackOffMultiplier(Short value) { info.setRedeliveryBackOffMultiplier(value); } + /** + * + */ public void setRedeliveryUseExponentialBackOff(Boolean value) { info.setRedeliveryUseExponentialBackOff(value); } // Prefetch policy configuration + /** + * + */ public Integer getDurableTopicPrefetch() { return info.getDurableTopicPrefetch(); } + /** + * + */ public Integer getInputStreamPrefetch() { return info.getInputStreamPrefetch(); } + /** + * + */ public Integer getQueueBrowserPrefetch() { return info.getQueueBrowserPrefetch(); } + /** + * + */ public Integer getQueuePrefetch() { return info.getQueuePrefetch(); } + /** + * + */ public Integer getTopicPrefetch() { return info.getTopicPrefetch(); } + /** + * + */ public void setAllPrefetchValues(Integer i) { info.setAllPrefetchValues(i); } + /** + * + */ public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { info.setDurableTopicPrefetch(durableTopicPrefetch); } + /** + * + */ public void setInputStreamPrefetch(Integer inputStreamPrefetch) { info.setInputStreamPrefetch(inputStreamPrefetch); } + /** + * + */ public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { info.setQueueBrowserPrefetch(queueBrowserPrefetch); } + /** + * + */ public void setQueuePrefetch(Integer queuePrefetch) { info.setQueuePrefetch(queuePrefetch); } + /** + * @param topicPrefetch + */ public void setTopicPrefetch(Integer topicPrefetch) { info.setTopicPrefetch(topicPrefetch); } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java deleted file mode 100755 index c4a80ae4af..0000000000 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ /dev/null @@ -1,511 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.ra; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.ServiceSupport; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.XAConnection; -import javax.jms.XASession; -import javax.resource.NotSupportedException; -import javax.resource.ResourceException; -import javax.resource.spi.ActivationSpec; -import javax.resource.spi.BootstrapContext; -import javax.resource.spi.ResourceAdapter; -import javax.resource.spi.ResourceAdapterInternalException; -import javax.resource.spi.endpoint.MessageEndpointFactory; -import javax.transaction.xa.XAResource; - -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; - -/** - * Knows how to connect to one ActiveMQ server. It can then activate endpoints - * and deliver messages to those end points using the connection configure in the - * resource adapter.

Must override equals and hashCode (JCA spec 16.4) - * - * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" - * description="The JCA Resource Adaptor for ActiveMQ" - * - * @version $Revision$ - */ -public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { - - private static final long serialVersionUID = -5417363537865649130L; - private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class); - - private final HashMap endpointWorkers = new HashMap(); - private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); - - private BootstrapContext bootstrapContext; - private String brokerXmlConfig; - private BrokerService broker; - private ActiveMQConnectionFactory connectionFactory; - - public ActiveMQResourceAdapter() { - } - - /** - * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) - */ - public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { - this.bootstrapContext = bootstrapContext; - if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) { - try { - broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); - broker.start(); - } catch (Throwable e) { - throw new ResourceAdapterInternalException("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e); - } - } - } - - public ActiveMQConnection makeConnection() throws JMSException { - if (connectionFactory != null) { - return makeConnection(info, connectionFactory); - } - else { - return makeConnection(info); - } - } - - /** - */ - public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException { - - ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); - return makeConnection(info, connectionFactory); - } - - public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException { - String userName = info.getUserName(); - String password = info.getPassword(); - ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); - - String clientId = info.getClientid(); - if (clientId != null && clientId.length() > 0) { - physicalConnection.setClientID(clientId); - } - return physicalConnection; - } - - /** - * @param activationSpec - */ - public ActiveMQConnection makeConnection(ActiveMQActivationSpec activationSpec) throws JMSException { - ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); - String userName = defaultValue(activationSpec.getUserName(), info.getUserName()); - String password = defaultValue(activationSpec.getPassword(), info.getPassword()); - String clientId = activationSpec.getClientId(); - if (clientId != null) { - connectionFactory.setClientID(clientId); - } - else { - if (activationSpec.isDurableSubscription()) { - log.warn("No clientID specified for durable subscription: " + activationSpec); - } - } - ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); - - // have we configured a redelivery policy - RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); - if (redeliveryPolicy != null) { - physicalConnection.setRedeliveryPolicy(redeliveryPolicy); - } - return physicalConnection; - } - - /** - * @param info - * @return - * @throws JMSException - * @throws URISyntaxException - */ - synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException { - ActiveMQConnectionFactory factory = connectionFactory; - if (factory != null && info.isConnectionFactoryConfigured()) { - factory = factory.copy(); - } - else if (factory == null) { - factory = new ActiveMQConnectionFactory(); - } - info.configure(factory); - return factory; - } - - private String defaultValue(String value, String defaultValue) { - if (value != null) - return value; - return defaultValue; - } - - /** - * @see javax.resource.spi.ResourceAdapter#stop() - */ - public void stop() { - while (endpointWorkers.size() > 0) { - ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next(); - endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); - } - if (broker != null) { - ServiceSupport.dispose(broker); - broker = null; - } - this.bootstrapContext = null; - } - - /** - * @return - */ - public BootstrapContext getBootstrapContext() { - return bootstrapContext; - } - - /** - * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, - * javax.resource.spi.ActivationSpec) - */ - public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) - throws ResourceException { - - // spec section 5.3.3 - if (activationSpec.getResourceAdapter() != this) { - throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance"); - } - - if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) { - - ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, - (ActiveMQActivationSpec) activationSpec); - // This is weird.. the same endpoint activated twice.. must be a - // container error. - if (endpointWorkers.containsKey(key)) { - throw new IllegalStateException("Endpoint previously activated"); - } - - ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); - - endpointWorkers.put(key, worker); - worker.start(); - - } else { - throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass()); - } - - } - - /** - * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, - * javax.resource.spi.ActivationSpec) - */ - public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { - - if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) { - ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec) activationSpec); - ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key); - if (worker == null) { - // This is weird.. that endpoint was not activated.. oh well.. - // this method - // does not throw exceptions so just return. - return; - } - try { - worker.stop(); - } catch (InterruptedException e) { - // We interrupted.. we won't throw an exception but will stop - // waiting for the worker - // to stop.. we tried our best. Keep trying to interrupt the - // thread. - Thread.currentThread().interrupt(); - } - - } - - } - - /** - * We only connect to one resource manager per ResourceAdapter instance, so - * any ActivationSpec will return the same XAResource. - * - * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) - */ - public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { - Connection connection = null; - try { - connection = makeConnection(); - if (connection instanceof XAConnection) { - XASession session = ((XAConnection) connection).createXASession(); - XAResource xaResource = session.getXAResource(); - return new XAResource[] { xaResource }; - } else { - return new XAResource[] {}; - } - } catch (JMSException e) { - throw new ResourceException(e); - } finally { - try { - connection.close(); - } catch (Throwable ignore) { - } - } - } - - // /////////////////////////////////////////////////////////////////////// - // - // Java Bean getters and setters for this ResourceAdapter class. - // - // /////////////////////////////////////////////////////////////////////// - - /** - * @return - */ - public String getClientid() { - return emptyToNull(info.getClientid()); - } - - /** - * @return - */ - public String getPassword() { - return emptyToNull(info.getPassword()); - } - - /** - * @return - */ - public String getServerUrl() { - return info.getServerUrl(); - } - - /** - * @return - */ - public String getUserName() { - return emptyToNull(info.getUserName()); - } - - /** - * @param clientid - */ - public void setClientid(String clientid) { - info.setClientid(clientid); - } - - /** - * @param password - */ - public void setPassword(String password) { - info.setPassword(password); - } - - /** - * @param url - */ - public void setServerUrl(String url) { - info.setServerUrl(url); - } - - /** - * @param userid - */ - public void setUserName(String userid) { - info.setUserName(userid); - } - - public String getBrokerXmlConfig() { - return brokerXmlConfig; - } - - /** - * Sets the XML - * configuration file used to configure the ActiveMQ broker via Spring - * if using embedded mode. - * - * @param brokerXmlConfig - * is the filename which is assumed to be on the classpath unless - * a URL is specified. So a value of foo/bar.xml - * would be assumed to be on the classpath whereas - * file:dir/file.xml would use the file system. - * Any valid URL string is supported. - * @see #setUseEmbeddedBroker(Boolean) - */ - public void setBrokerXmlConfig(String brokerXmlConfig) { - this.brokerXmlConfig=brokerXmlConfig; - } - - public Integer getDurableTopicPrefetch() { - return info.getDurableTopicPrefetch(); - } - - public Long getInitialRedeliveryDelay() { - return info.getInitialRedeliveryDelay(); - } - - public Integer getInputStreamPrefetch() { - return info.getInputStreamPrefetch(); - } - - public Integer getMaximumRedeliveries() { - return info.getMaximumRedeliveries(); - } - - public Integer getQueueBrowserPrefetch() { - return info.getQueueBrowserPrefetch(); - } - - public Integer getQueuePrefetch() { - return info.getQueuePrefetch(); - } - - public Short getRedeliveryBackOffMultiplier() { - return info.getRedeliveryBackOffMultiplier(); - } - - public Boolean getRedeliveryUseExponentialBackOff() { - return info.getRedeliveryUseExponentialBackOff(); - } - - public Integer getTopicPrefetch() { - return info.getTopicPrefetch(); - } - - public boolean isUseInboundSessionEnabled() { - return info.isUseInboundSessionEnabled(); - } - - public void setAllPrefetchValues(Integer i) { - info.setAllPrefetchValues(i); - } - - public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { - info.setDurableTopicPrefetch(durableTopicPrefetch); - } - - public void setInitialRedeliveryDelay(Long value) { - info.setInitialRedeliveryDelay(value); - } - - public void setInputStreamPrefetch(Integer inputStreamPrefetch) { - info.setInputStreamPrefetch(inputStreamPrefetch); - } - - public void setMaximumRedeliveries(Integer value) { - info.setMaximumRedeliveries(value); - } - - public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { - info.setQueueBrowserPrefetch(queueBrowserPrefetch); - } - - public void setQueuePrefetch(Integer queuePrefetch) { - info.setQueuePrefetch(queuePrefetch); - } - - public void setRedeliveryBackOffMultiplier(Short value) { - info.setRedeliveryBackOffMultiplier(value); - } - - public void setRedeliveryUseExponentialBackOff(Boolean value) { - info.setRedeliveryUseExponentialBackOff(value); - } - - public void setTopicPrefetch(Integer topicPrefetch) { - info.setTopicPrefetch(topicPrefetch); - } - - /** - * @return Returns the info. - */ - public ActiveMQConnectionRequestInfo getInfo() { - return info; - } - - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ActiveMQResourceAdapter)) { - return false; - } - - final ActiveMQResourceAdapter activeMQResourceAdapter = (ActiveMQResourceAdapter) o; - - if (!info.equals(activeMQResourceAdapter.info)) { - return false; - } - if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.brokerXmlConfig) ) { - return false; - } - - return true; - } - - private boolean notEqual(Object o1, Object o2) { - return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2)); - } - - - public int hashCode() { - int result; - result = info.hashCode(); - if( brokerXmlConfig !=null ) { - result ^= brokerXmlConfig.hashCode(); - } - return result; - } - - private String emptyToNull(String value) { - if (value == null || value.length() == 0) { - return null; - } - return value; - } - - public Boolean getUseInboundSession() { - return info.getUseInboundSession(); - } - - public void setUseInboundSession(Boolean useInboundSession) { - info.setUseInboundSession(useInboundSession); - } - - public ActiveMQConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - /** - * This allows a connection factory to be configured and shared between a ResourceAdaptor and outbound messaging. - * Note that setting the connectionFactory will overload many of the properties on this POJO such as the redelivery - * and prefetch policies; the properties on the connectionFactory will be used instead. - */ - public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; - } - - -} diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java new file mode 100755 index 0000000000..5e5fd6ad44 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java @@ -0,0 +1,139 @@ +/** + * $Header$ + * + * Broker Office ESPECIAL (Release) - org.apache.activemq.ra + * + * Copyright (C) 2005-2007 Norvax, Inc. + * All Rights Reserved + * + * This is UNPUBLISHED PROPRIETARY SOURCE CODE of Norvax, Inc.; the contents + * of this file may not be disclosed to third parties, copied or duplicated + * in any form, in whole or in part, without the prior written permission of + * Norvax, Inc. The copyright notice above does not evidence any actual or + * intended publication of such source code. + * + * Permission is hereby granted solely to the licensee for use of this source + * code in its unaltered state. This source code may not be modified by + * licensee except under direction of Norvax, Inc. Neither may this source + * code be given under any circumstances to non-licensees in any form, + * including source or binary. Modification of this source constitutes breach + * of contract, which voids any potential pending support responsibilities by + * Norvax, Inc. Divulging the exact or paraphrased contents of this source + * code to unlicensed parties either directly or indirectly constitutes + * violation of federal and international copyright and trade secret laws, and + * will be duly prosecuted to the fullest extent permitted under law. + * + * This software is provided by Norvax, Inc. ``as is'' and any express or + * implied warranties, including, but not limited to, the implied warranties + * of merchantability and fitness for a particular purpose are disclaimed. In + * no event shall the regents or contributors be liable for any direct, + * indirect, incidental, special, exemplary, or consequential damages + * (including, but not limited to, procurement of substitute goods or + * services; loss of use, data, or profits; or business interruption) however + * caused and on any theory of liability, whether in contract, strict + * liability, or tort (including negligence or otherwise) arising in any way + * out of the use of this software, even if advised of the possibility of such + * damage. + * + **/ + +package org.apache.activemq.ra; + +import javax.resource.spi.ActivationSpec; + +import org.apache.activemq.RedeliveryPolicy; + +/** + * Description: Description goes here. + * + * @author Christopher G. Stach II + * @version $Revision$ $Date$ + * @since 0.1 + */ +public interface MessageActivationSpec + extends ActivationSpec +{ + + /** + */ + String getClientId(); + + /** + */ + boolean isDurableSubscription(); + + /** + */ + String getPassword(); + + /** + */ + String getUserName(); + + /** + */ + RedeliveryPolicy redeliveryPolicy(); + + /** + */ + String getSubscriptionName(); + + /** + */ + String getMessageSelector(); + + /** + */ + int getMaxMessagesPerSessionsIntValue(); + + /** + */ + boolean getNoLocalBooleanValue(); + + /** + */ + String getDestinationType(); + + /** + */ + String getDestination(); + + /** + */ + int getMaxSessionsIntValue(); + + /** + * + */ + int getAcknowledgeModeForSession(); + + /** + * + */ + String getMaxMessagesPerSessions(); + + /** + * + */ + boolean isUseRAManagedTransactionEnabled(); + + /** + * + */ + String getEnableBatch(); + + /** + * + */ + boolean getEnableBatchBooleanValue(); + + /** + * + */ + int getMaxMessagesPerBatchIntValue(); + + /** + * + */ + String getMaxMessagesPerBatch(); +} diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java new file mode 100755 index 0000000000..4f6729bf1b --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.resource.spi.BootstrapContext; +import javax.resource.spi.ResourceAdapter; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * Knows how to connect to one ActiveMQ server. It can then activate endpoints + * and deliver messages to those end points using the connection configure in + * the resource adapter.

Must override equals and hashCode (JCA spec 16.4) + * + * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" + * description="The JCA Resource Adaptor for ActiveMQ" + * @version $Revision$ + */ +public interface MessageResourceAdapter + extends ResourceAdapter +{ + + /** + */ + public ActiveMQConnection makeConnection() + throws JMSException; + + /** + */ + public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) + throws JMSException; + + /** + */ + public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) + throws JMSException; + + /** + * @param activationSpec + */ + public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) + throws JMSException; + + /** + * @return bootstrap context + */ + public BootstrapContext getBootstrapContext(); + + /** + */ + public String getBrokerXmlConfig(); + + /** + * @return Returns the info. + */ + public ActiveMQConnectionRequestInfo getInfo(); + + /** + */ + public ActiveMQConnectionFactory getConnectionFactory(); + +} diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index 3430867346..4e56e1ceaf 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -17,9 +17,7 @@ */ package org.apache.activemq.ra; -import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import javax.jms.JMSException; @@ -58,7 +56,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } private ServerSessionImpl createServerSessionImpl() throws JMSException { - ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); + MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge); MessageEndpoint endpoint; @@ -130,7 +128,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } /** - * @param message + * @param messageDispatch the message to dispatch * @throws JMSException */ private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException {