diff --git a/activemq-ra/pom.xml b/activemq-ra/pom.xml index e86b4c5774..0b15884016 100755 --- a/activemq-ra/pom.xml +++ b/activemq-ra/pom.xml @@ -127,10 +127,10 @@ false true - org.apache.activemq.ra.ServerSessionImplTest + **/*Test.* diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java index 6550c71333..1187607dc0 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java @@ -42,19 +42,22 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec private static final Log LOG = LogFactory.getLog(ActiveMQConnectionFactory.class); private ConnectionManager manager; - private transient ActiveMQManagedConnectionFactory factory; + private ActiveMQManagedConnectionFactory factory; private Reference reference; private final ActiveMQConnectionRequestInfo info; /** * @param factory * @param manager - * @param info + * @param connectionRequestInfo */ - public ActiveMQConnectionFactory(ActiveMQManagedConnectionFactory factory, ConnectionManager manager, ActiveMQConnectionRequestInfo info) { + public ActiveMQConnectionFactory( + ActiveMQManagedConnectionFactory factory, + ConnectionManager manager, + ActiveMQConnectionRequestInfo connectionRequestInfo) { this.factory = factory; this.manager = manager; - this.info = info; + this.info = connectionRequestInfo; } /** @@ -76,19 +79,19 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec } /** - * @param info + * @param connectionRequestInfo * @return * @throws JMSException */ - private Connection createConnection(ActiveMQConnectionRequestInfo info) throws JMSException { + private Connection createConnection(ActiveMQConnectionRequestInfo connectionRequestInfo) throws JMSException { try { - if (info.isUseInboundSessionEnabled()) { + if (connectionRequestInfo.isUseInboundSessionEnabled()) { return new InboundConnectionProxy(); } if (manager == null) { throw new JMSException("No JCA ConnectionManager configured! Either enable UseInboundSessionEnabled or get your JCA container to configure one."); } - return (Connection)manager.allocateConnection(factory, info); + return (Connection)manager.allocateConnection(factory, connectionRequestInfo); } catch (ResourceException e) { // Throw the root cause if it was a JMSException.. if (e.getCause() instanceof JMSException) { diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java index 05815a7209..eee323960a 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java @@ -175,9 +175,14 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser this.clientid = clientid; } + @Override public String toString() { - return "ActiveMQConnectionRequestInfo{ " + "userName = '" + userName + "' " + ", serverUrl = '" + serverUrl + "' " + ", clientid = '" + clientid + "' " + ", userName = '" + userName + "' " - + ", useInboundSession = '" + useInboundSession + "' " + " }"; + return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ") + .append(", serverUrl = '").append(serverUrl).append("' ") + .append(", clientid = '").append(clientid).append("' ") + .append(", userName = '").append(userName).append("' ") + .append(", useInboundSession = '").append(useInboundSession).append("' }") + .toString(); } public Boolean getUseInboundSession() { diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java new file mode 100644 index 0000000000..f5f36ec660 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Abstract base class providing support for creating physical + * connections to an ActiveMQ instance. + * + * @version $Revision$ + */ +public class ActiveMQConnectionSupport { + + private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); + protected Log log = LogFactory.getLog(getClass()); + + /** + * Creates a factory for obtaining physical connections to an Active MQ + * broker. The factory is configured with the given configuration information. + * + * @param connectionRequestInfo the configuration request information + * @return the connection factory + * @throws java.lang.IllegalArgumentException if the server URL given in the + * configuration information is not a valid URL + */ + protected ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo connectionRequestInfo) { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + connectionRequestInfo.configure(factory); + return factory; + } + + /** + * Creates a new physical connection to an Active MQ broker identified by given + * connection request information. + * + * @param connectionRequestInfo the connection request information identifying the broker and any + * required connection parameters, e.g. username/password + * @return the physical connection + * @throws JMSException if the connection could not be established + */ + public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo connectionRequestInfo) throws JMSException{ + return makeConnection(connectionRequestInfo, createConnectionFactory(connectionRequestInfo)); + } + + /** + * Creates a new physical connection to an Active MQ broker using a given + * connection factory and credentials supplied in connection request information. + * + * @param connectionRequestInfo the connection request information containing the credentials to use + * for the connection request + * @return the physical connection + * @throws JMSException if the connection could not be established + */ + protected ActiveMQConnection makeConnection( + ActiveMQConnectionRequestInfo connectionRequestInfo, + ActiveMQConnectionFactory connectionFactory) throws JMSException + { + String userName = connectionRequestInfo.getUserName(); + String password = connectionRequestInfo.getPassword(); + ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); + + String clientId = connectionRequestInfo.getClientid(); + if ( clientId != null && clientId.length() > 0 ) + { + physicalConnection.setClientID(clientId); + } + return physicalConnection; + } + + /** + * Gets the connection request information. + * + * @return the connection request information + */ + public ActiveMQConnectionRequestInfo getInfo() + { + return info; + } + + /** + * Sets the connection request information as a whole. + * + * @param the connection request information + */ + protected void setInfo(ActiveMQConnectionRequestInfo connectionRequestInfo){ + info = connectionRequestInfo; + } + + protected boolean notEqual(Object o1, Object o2) { + return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2)); + } + + protected String emptyToNull(String value) { + if (value == null || value.length() == 0) + { + return null; + } + else + { + return value; + } + } + + protected String defaultValue(String value, String defaultValue) { + if (value != null) { + return value; + } + return defaultValue; + } + + // /////////////////////////////////////////////////////////////////////// + // + // Java Bean getters and setters for this ResourceAdapter class. + // + // /////////////////////////////////////////////////////////////////////// + + /** + * @return client id + */ + public String getClientid() { + return emptyToNull(info.getClientid()); + } + + /** + * @param clientid + */ + public void setClientid(String clientid) { + if ( log.isDebugEnabled() ) { + log.debug("setting [clientid] to: " + clientid); + } + info.setClientid(clientid); + } + + /** + * @return password + */ + public String getPassword() { + return emptyToNull(info.getPassword()); + } + + /** + * @param password + */ + public void setPassword(String password) { + if ( log.isDebugEnabled() ) { + log.debug("setting [password] property"); + } + info.setPassword(password); + } + + /** + * @return server URL + */ + public String getServerUrl() { + return info.getServerUrl(); + } + + /** + * @param url + */ + public void setServerUrl(String url) { + if ( log.isDebugEnabled() ) { + log.debug("setting [serverUrl] to: " + url); + } + info.setServerUrl(url); + } + + /** + * @return user name + */ + public String getUserName() { + return emptyToNull(info.getUserName()); + } + + /** + * @param userid + */ + public void setUserName(String userid) { + if ( log.isDebugEnabled() ) { + log.debug("setting [userName] to: " + userid); + } + info.setUserName(userid); + } + + /** + * @return durable topic prefetch + */ + public Integer getDurableTopicPrefetch() { + return info.getDurableTopicPrefetch(); + } + + /** + * @param durableTopicPrefetch + */ + public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { + if ( log.isDebugEnabled() ) { + log.debug("setting [durableTopicPrefetch] to: " + durableTopicPrefetch); + } + info.setDurableTopicPrefetch(durableTopicPrefetch); + } + + /** + * @return initial redelivery delay + */ + public Long getInitialRedeliveryDelay() { + return info.getInitialRedeliveryDelay(); + } + + /** + * @param value + */ + public void setInitialRedeliveryDelay(Long value) { + if ( log.isDebugEnabled() ) { + log.debug("setting [initialRedeliveryDelay] to: " + value); + } + info.setInitialRedeliveryDelay(value); + } + + /** + * @return input stream prefetch + */ + public Integer getInputStreamPrefetch() { + return info.getInputStreamPrefetch(); + } + + /** + * @param inputStreamPrefetch + */ + public void setInputStreamPrefetch(Integer inputStreamPrefetch) { + if ( log.isDebugEnabled() ) { + log.debug("setting [inputStreamPrefetch] to: " + inputStreamPrefetch); + } + info.setInputStreamPrefetch(inputStreamPrefetch); + } + + /** + * @return maximum redeliveries + */ + public Integer getMaximumRedeliveries() { + return info.getMaximumRedeliveries(); + } + + /** + * @param value + */ + public void setMaximumRedeliveries(Integer value) { + if ( log.isDebugEnabled() ) { + log.debug("setting [maximumRedeliveries] to: " + value); + } + info.setMaximumRedeliveries(value); + } + + /** + * @return queue browser prefetch + */ + public Integer getQueueBrowserPrefetch() { + return info.getQueueBrowserPrefetch(); + } + + /** + * @param queueBrowserPrefetch + */ + public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { + if ( log.isDebugEnabled() ) { + log.debug("setting [queueBrowserPrefetch] to: " + queueBrowserPrefetch); + } + info.setQueueBrowserPrefetch(queueBrowserPrefetch); + } + + /** + * @return queue prefetch + */ + public Integer getQueuePrefetch() { + return info.getQueuePrefetch(); + } + + /** + * @param queuePrefetch + */ + public void setQueuePrefetch(Integer queuePrefetch) { + if ( log.isDebugEnabled() ) { + log.debug("setting [queuePrefetch] to: " + queuePrefetch); + } + info.setQueuePrefetch(queuePrefetch); + } + + /** + * @return redelivery backoff multiplier + */ + public Short getRedeliveryBackOffMultiplier() { + return info.getRedeliveryBackOffMultiplier(); + } + + /** + * @param value + */ + public void setRedeliveryBackOffMultiplier(Short value) { + if ( log.isDebugEnabled() ) { + log.debug("setting [redeliveryBackOffMultiplier] to: " + value); + } + info.setRedeliveryBackOffMultiplier(value); + } + + /** + * @return redelivery use exponential backoff + */ + public Boolean getRedeliveryUseExponentialBackOff() { + return info.getRedeliveryUseExponentialBackOff(); + } + + /** + * @param value + */ + public void setRedeliveryUseExponentialBackOff(Boolean value) { + if ( log.isDebugEnabled() ) { + log.debug("setting [redeliveryUseExponentialBackOff] to: " + value); + } + info.setRedeliveryUseExponentialBackOff(value); + } + + /** + * @return topic prefetch + */ + public Integer getTopicPrefetch() { + return info.getTopicPrefetch(); + } + + /** + * @param topicPrefetch + */ + public void setTopicPrefetch(Integer topicPrefetch) { + if ( log.isDebugEnabled() ) { + log.debug("setting [topicPrefetch] to: " + topicPrefetch); + } + info.setTopicPrefetch(topicPrefetch); + } + + /** + * @param i + */ + public void setAllPrefetchValues(Integer i) { + info.setAllPrefetchValues(i); + } + + /** + * @return use inbound session enabled + */ + public boolean isUseInboundSessionEnabled() { + return info.isUseInboundSessionEnabled(); + } + + /** + * @return use inbound session + */ + public Boolean getUseInboundSession() { + return info.getUseInboundSession(); + } + + /** + * @param useInboundSession + */ + public void setUseInboundSession(Boolean useInboundSession) { + if ( log.isDebugEnabled() ) { + log.debug("setting [useInboundSession] to: " + useInboundSession); + } + info.setUseInboundSession(useInboundSession); + } + +} diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java index 9699af0b27..5b8792ca6a 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java @@ -16,7 +16,11 @@ */ package org.apache.activemq.ra; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.PrintWriter; +import java.io.Serializable; import java.util.Iterator; import java.util.Set; @@ -29,18 +33,17 @@ import javax.resource.spi.ManagedConnectionFactory; import javax.resource.spi.ResourceAdapter; import javax.resource.spi.ResourceAdapterAssociation; import javax.security.auth.Subject; +import org.apache.commons.logging.LogFactory; /** * @version $Revisio n$ TODO: Must override equals and hashCode (JCA spec 16.4) * @org.apache.xbean.XBean element="managedConnectionFactory" */ -public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation { +public class ActiveMQManagedConnectionFactory extends ActiveMQConnectionSupport + implements ManagedConnectionFactory, ResourceAdapterAssociation { private static final long serialVersionUID = 6196921962230582875L; - - private MessageResourceAdapter adapter; private PrintWriter logWriter; - private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); /** * @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter) @@ -49,23 +52,35 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor if (!(adapter instanceof MessageResourceAdapter)) { throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName()); } - this.adapter = (MessageResourceAdapter)adapter; - ActiveMQConnectionRequestInfo baseInfo = this.adapter.getInfo().copy(); - if (info.getClientid() == null) { - info.setClientid(baseInfo.getClientid()); + else + { + if ( log.isDebugEnabled() ) { + log.debug("copying standard ResourceAdapter configuration properties"); } - if (info.getPassword() == null) { - info.setPassword(baseInfo.getPassword()); + ActiveMQConnectionRequestInfo baseInfo = ((MessageResourceAdapter) adapter).getInfo().copy(); + if (getClientid() == null) { + setClientid(baseInfo.getClientid()); } - if (info.getServerUrl() == null) { - info.setServerUrl(baseInfo.getServerUrl()); + if (getPassword() == null) { + setPassword(baseInfo.getPassword()); } - if (info.getUseInboundSession() == null) { - info.setUseInboundSession(baseInfo.getUseInboundSession()); + if (getServerUrl() == null) { + setServerUrl(baseInfo.getServerUrl()); } - if (info.getUserName() == null) { - info.setUserName(baseInfo.getUserName()); + if (getUseInboundSession() == null) { + setUseInboundSession(baseInfo.getUseInboundSession()); } + if (getUserName() == null) { + setUserName(baseInfo.getUserName()); + } + } + } + + /** + * @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter() + */ + public ResourceAdapter getResourceAdapter() { + return null; } /** @@ -76,7 +91,7 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor if (object == null || object.getClass() != ActiveMQManagedConnectionFactory.class) { return false; } - return ((ActiveMQManagedConnectionFactory)object).info.equals(info); + return ((ActiveMQManagedConnectionFactory)object).getInfo().equals(getInfo()); } /** @@ -84,21 +99,47 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor */ @Override public int hashCode() { - return info.hashCode(); + return getInfo().hashCode(); } /** - * @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter() + * Writes this factory during serialization along with the superclass' info property. + * This needs to be done manually since the superclass is not serializable itself. + * + * @param out the stream to write object state to + * @throws java.io.IOException if the object cannot be serialized */ - public ResourceAdapter getResourceAdapter() { - return adapter; + private void writeObject(ObjectOutputStream out) throws IOException { + if ( logWriter != null && !(logWriter instanceof Serializable) ) { + // if the PrintWriter injected by the application server is not + // serializable we just drop the reference and let the application + // server re-inject a PrintWriter later (after this factory has been + // deserialized again) using the standard setLogWriter() method + logWriter = null; + } + out.defaultWriteObject(); + out.writeObject(getInfo()); } + /** + * Restores this factory along with the superclass' info property. + * This needs to be done manually since the superclass is not serializable itself. + * + * @param in the stream to read object state from + * @throws java.io.IOException if the object state could not be restored + * @throws java.lang.ClassNotFoundException if the object state could not be restored + */ + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + setInfo((ActiveMQConnectionRequestInfo) in.readObject()); + log = LogFactory.getLog(getClass()); + } + /** * @see javax.resource.spi.ManagedConnectionFactory#createConnectionFactory(javax.resource.spi.ConnectionManager) */ public Object createConnectionFactory(ConnectionManager manager) throws ResourceException { - return new ActiveMQConnectionFactory(this, manager, info); + return new ActiveMQConnectionFactory(this, manager, getInfo()); } /** @@ -110,20 +151,22 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor * @see javax.resource.spi.ManagedConnectionFactory#createConnectionFactory() */ public Object createConnectionFactory() throws ResourceException { - return new ActiveMQConnectionFactory(this, new SimpleConnectionManager(), info); + return new ActiveMQConnectionFactory(this, new SimpleConnectionManager(), getInfo()); } /** * @see javax.resource.spi.ManagedConnectionFactory#createManagedConnection(javax.security.auth.Subject, * javax.resource.spi.ConnectionRequestInfo) */ - public ManagedConnection createManagedConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException { - try { - if (info == null) { - info = this.info; + public ManagedConnection createManagedConnection( + Subject subject, + ConnectionRequestInfo connectionRequestInfo) throws ResourceException { + ActiveMQConnectionRequestInfo amqInfo = getInfo(); + if ( connectionRequestInfo instanceof ActiveMQConnectionRequestInfo ) { + amqInfo = (ActiveMQConnectionRequestInfo) connectionRequestInfo; } - ActiveMQConnectionRequestInfo amqInfo = (ActiveMQConnectionRequestInfo)info; - return new ActiveMQManagedConnection(subject, adapter.makeConnection(amqInfo), amqInfo); + try { + return new ActiveMQManagedConnection(subject, makeConnection(amqInfo), amqInfo); } catch (JMSException e) { throw new ResourceException("Could not create connection.", e); } @@ -134,13 +177,16 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor * javax.security.auth.Subject, * javax.resource.spi.ConnectionRequestInfo) */ - public ManagedConnection matchManagedConnections(Set connections, Subject subject, ConnectionRequestInfo info) throws ResourceException { + public ManagedConnection matchManagedConnections( + Set connections, + Subject subject, + ConnectionRequestInfo connectionRequestInfo) throws ResourceException { Iterator iterator = connections.iterator(); while (iterator.hasNext()) { ActiveMQManagedConnection c = (ActiveMQManagedConnection)iterator.next(); - if (c.matches(subject, info)) { + if (c.matches(subject, connectionRequestInfo)) { try { - c.associate(subject, (ActiveMQConnectionRequestInfo)info); + c.associate(subject, (ActiveMQConnectionRequestInfo) connectionRequestInfo); return c; } catch (JMSException e) { throw new ResourceException(e); @@ -153,221 +199,21 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor /** * @see javax.resource.spi.ManagedConnectionFactory#setLogWriter(java.io.PrintWriter) */ - public void setLogWriter(PrintWriter logWriter) throws ResourceException { - this.logWriter = logWriter; + public void setLogWriter(PrintWriter aLogWriter) throws ResourceException { + if ( log.isTraceEnabled() ) { + log.trace("setting log writer [" + aLogWriter + "]"); + } + this.logWriter = aLogWriter; } /** * @see javax.resource.spi.ManagedConnectionFactory#getLogWriter() */ public PrintWriter getLogWriter() throws ResourceException { + if ( log.isTraceEnabled() ) { + log.trace("getting log writer [" + logWriter + "]"); + } return logWriter; } - // ///////////////////////////////////////////////////////////////////////// - // - // Bean setters and getters. - // - // ///////////////////////////////////////////////////////////////////////// - - /** - * - */ - public String getClientid() { - return info.getClientid(); } - - /** - * - */ - public String getPassword() { - return info.getPassword(); - } - - /** - * - */ - public String getUserName() { - return info.getUserName(); - } - - /** - * - */ - public void setClientid(String clientid) { - info.setClientid(clientid); - } - - /** - * - */ - public void setPassword(String password) { - info.setPassword(password); - } - - /** - * - */ - public void setUserName(String userid) { - info.setUserName(userid); - } - - /** - * - */ - /** - * - */ - public Boolean getUseInboundSession() { - return info.getUseInboundSession(); - } - - /** - * - */ - public void setUseInboundSession(Boolean useInboundSession) { - info.setUseInboundSession(useInboundSession); - } - - /** - * - */ - public boolean isUseInboundSessionEnabled() { - return info.isUseInboundSessionEnabled(); - } - - // Redelivery policy configuration - /** - * - */ - public Long getInitialRedeliveryDelay() { - return info.getInitialRedeliveryDelay(); - } - - /** - * - */ - public Integer getMaximumRedeliveries() { - return info.getMaximumRedeliveries(); - } - - /** - * - */ - public Short getRedeliveryBackOffMultiplier() { - return info.getRedeliveryBackOffMultiplier(); - } - - /** - * - */ - public Boolean getRedeliveryUseExponentialBackOff() { - return info.getRedeliveryUseExponentialBackOff(); - } - - /** - * - */ - public void setInitialRedeliveryDelay(Long value) { - info.setInitialRedeliveryDelay(value); - } - - /** - * - */ - public void setMaximumRedeliveries(Integer value) { - info.setMaximumRedeliveries(value); - } - - /** - * - */ - public void setRedeliveryBackOffMultiplier(Short value) { - info.setRedeliveryBackOffMultiplier(value); - } - - /** - * - */ - public void setRedeliveryUseExponentialBackOff(Boolean value) { - info.setRedeliveryUseExponentialBackOff(value); - } - - // Prefetch policy configuration - /** - * - */ - public Integer getDurableTopicPrefetch() { - return info.getDurableTopicPrefetch(); - } - - /** - * - */ - public Integer getInputStreamPrefetch() { - return info.getInputStreamPrefetch(); - } - - /** - * - */ - public Integer getQueueBrowserPrefetch() { - return info.getQueueBrowserPrefetch(); - } - - /** - * - */ - public Integer getQueuePrefetch() { - return info.getQueuePrefetch(); - } - - /** - * - */ - public Integer getTopicPrefetch() { - return info.getTopicPrefetch(); - } - - /** - * - */ - public void setAllPrefetchValues(Integer i) { - info.setAllPrefetchValues(i); - } - - /** - * - */ - public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { - info.setDurableTopicPrefetch(durableTopicPrefetch); - } - - /** - * - */ - public void setInputStreamPrefetch(Integer inputStreamPrefetch) { - info.setInputStreamPrefetch(inputStreamPrefetch); - } - - /** - * - */ - public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { - info.setQueueBrowserPrefetch(queueBrowserPrefetch); - } - - /** - * - */ - public void setQueuePrefetch(Integer queuePrefetch) { - info.setQueuePrefetch(queuePrefetch); - } - - /** - * @param topicPrefetch - */ - public void setTopicPrefetch(Integer topicPrefetch) { - info.setTopicPrefetch(topicPrefetch); - } -} diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 69881989ae..d290e974d6 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.ra; -import java.io.Serializable; import java.net.URI; -import java.net.URISyntaxException; import java.util.HashMap; import javax.jms.Connection; @@ -39,8 +37,6 @@ import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.ServiceSupport; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** * Knows how to connect to one ActiveMQ server. It can then activate endpoints @@ -51,18 +47,13 @@ import org.apache.commons.logging.LogFactory; * description="The JCA Resource Adaptor for ActiveMQ" * @version $Revision$ */ -public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable { - - private static final long serialVersionUID = -5417363537865649130L; - private static final Log LOG = LogFactory.getLog(ActiveMQResourceAdapter.class); +public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter { private final HashMap endpointWorkers = new HashMap(); - private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); private BootstrapContext bootstrapContext; private String brokerXmlConfig; private BrokerService broker; - private ActiveMQConnectionFactory connectionFactory; private Thread brokerStartThread; /** @@ -86,8 +77,8 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ } broker.start(); } catch (Throwable e) { - LOG.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); - LOG.debug("Reason for: "+e.getMessage(), e); + log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); + log.debug("Reason for: "+e.getMessage(), e); } } }; @@ -107,49 +98,22 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection() */ public ActiveMQConnection makeConnection() throws JMSException { - if (connectionFactory != null) { - return makeConnection(info, connectionFactory); + return makeConnection(getInfo()); } - return makeConnection(info); - } - - /** - */ - public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException { - - ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); - return makeConnection(info, connectionFactory); - } - - /** - * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection(org.apache.activemq.ra.ActiveMQConnectionRequestInfo, - * org.apache.activemq.ActiveMQConnectionFactory) - */ - public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException { - String userName = info.getUserName(); - String password = info.getPassword(); - ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName, password); - - String clientId = info.getClientid(); - if (clientId != null && clientId.length() > 0) { - physicalConnection.setClientID(clientId); - } - return physicalConnection; - } /** * @param activationSpec */ public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { - ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info); - String userName = defaultValue(activationSpec.getUserName(), info.getUserName()); - String password = defaultValue(activationSpec.getPassword(), info.getPassword()); + ActiveMQConnectionFactory connectionFactory = createConnectionFactory(getInfo()); + String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); + String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); String clientId = activationSpec.getClientId(); if (clientId != null) { connectionFactory.setClientID(clientId); } else { if (activationSpec.isDurableSubscription()) { - LOG.warn("No clientID specified for durable subscription: " + activationSpec); + log.warn("No clientID specified for durable subscription: " + activationSpec); } } ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName, password); @@ -162,29 +126,6 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ return physicalConnection; } - /** - * @param info - * @throws JMSException - * @throws URISyntaxException - */ - private synchronized ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException { - ActiveMQConnectionFactory factory = connectionFactory; - if (factory != null && info.isConnectionFactoryConfigured()) { - factory = factory.copy(); - } else if (factory == null) { - factory = new ActiveMQConnectionFactory(); - } - info.configure(factory); - return factory; - } - - private String defaultValue(String value, String defaultValue) { - if (value != null) { - return value; - } - return defaultValue; - } - /** * @see javax.resource.spi.ResourceAdapter#stop() */ @@ -306,62 +247,6 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ // // /////////////////////////////////////////////////////////////////////// - /** - * @return client id - */ - public String getClientid() { - return emptyToNull(info.getClientid()); - } - - /** - * @return password - */ - public String getPassword() { - return emptyToNull(info.getPassword()); - } - - /** - * @return server URL - */ - public String getServerUrl() { - return info.getServerUrl(); - } - - /** - * @return user name - */ - public String getUserName() { - return emptyToNull(info.getUserName()); - } - - /** - * @param clientid - */ - public void setClientid(String clientid) { - info.setClientid(clientid); - } - - /** - * @param password - */ - public void setPassword(String password) { - info.setPassword(password); - } - - /** - * @param url - */ - public void setServerUrl(String url) { - info.setServerUrl(url); - } - - /** - * @param userid - */ - public void setUserName(String userid) { - info.setUserName(userid); - } - /** * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig() */ @@ -384,153 +269,6 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ this.brokerXmlConfig = brokerXmlConfig; } - /** - * @return durable topic prefetch - */ - public Integer getDurableTopicPrefetch() { - return info.getDurableTopicPrefetch(); - } - - /** - * @return initial redelivery delay - */ - public Long getInitialRedeliveryDelay() { - return info.getInitialRedeliveryDelay(); - } - - /** - * @return input stream prefetch - */ - public Integer getInputStreamPrefetch() { - return info.getInputStreamPrefetch(); - } - - /** - * @return maximum redeliveries - */ - public Integer getMaximumRedeliveries() { - return info.getMaximumRedeliveries(); - } - - /** - * @return queue browser prefetch - */ - public Integer getQueueBrowserPrefetch() { - return info.getQueueBrowserPrefetch(); - } - - /** - * @return queue prefetch - */ - public Integer getQueuePrefetch() { - return info.getQueuePrefetch(); - } - - /** - * @return redelivery backoff multiplier - */ - public Short getRedeliveryBackOffMultiplier() { - return info.getRedeliveryBackOffMultiplier(); - } - - /** - * @return redelivery use exponential backoff - */ - public Boolean getRedeliveryUseExponentialBackOff() { - return info.getRedeliveryUseExponentialBackOff(); - } - - /** - * @return topic prefetch - */ - public Integer getTopicPrefetch() { - return info.getTopicPrefetch(); - } - - /** - * @return use inbound session enabled - */ - public boolean isUseInboundSessionEnabled() { - return info.isUseInboundSessionEnabled(); - } - - /** - * @param i - */ - public void setAllPrefetchValues(Integer i) { - info.setAllPrefetchValues(i); - } - - /** - * @param durableTopicPrefetch - */ - public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { - info.setDurableTopicPrefetch(durableTopicPrefetch); - } - - /** - * @param value - */ - public void setInitialRedeliveryDelay(Long value) { - info.setInitialRedeliveryDelay(value); - } - - /** - * @param inputStreamPrefetch - */ - public void setInputStreamPrefetch(Integer inputStreamPrefetch) { - info.setInputStreamPrefetch(inputStreamPrefetch); - } - - /** - * @param value - */ - public void setMaximumRedeliveries(Integer value) { - info.setMaximumRedeliveries(value); - } - - /** - * @param queueBrowserPrefetch - */ - public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { - info.setQueueBrowserPrefetch(queueBrowserPrefetch); - } - - /** - * @param queuePrefetch - */ - public void setQueuePrefetch(Integer queuePrefetch) { - info.setQueuePrefetch(queuePrefetch); - } - - /** - * @param value - */ - public void setRedeliveryBackOffMultiplier(Short value) { - info.setRedeliveryBackOffMultiplier(value); - } - - /** - * @param value - */ - public void setRedeliveryUseExponentialBackOff(Boolean value) { - info.setRedeliveryUseExponentialBackOff(value); - } - - /** - * @param topicPrefetch - */ - public void setTopicPrefetch(Integer topicPrefetch) { - info.setTopicPrefetch(topicPrefetch); - } - - /** - * @return Returns the info. - */ - public ActiveMQConnectionRequestInfo getInfo() { - return info; - } - /** * @see java.lang.Object#equals(java.lang.Object) */ @@ -545,7 +283,7 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o; - if (!info.equals(activeMQResourceAdapter.getInfo())) { + if (!getInfo().equals(activeMQResourceAdapter.getInfo())) { return false; } if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) { @@ -555,60 +293,18 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ return true; } - private boolean notEqual(Object o1, Object o2) { - return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2)); - } - /** * @see java.lang.Object#hashCode() */ @Override public int hashCode() { int result; - result = info.hashCode(); + result = getInfo().hashCode(); if (brokerXmlConfig != null) { result ^= brokerXmlConfig.hashCode(); } return result; } - private String emptyToNull(String value) { - if (value == null || value.length() == 0) { - return null; - } - return value; - } - /** - * @return use inbound session - */ - public Boolean getUseInboundSession() { - return info.getUseInboundSession(); } - - /** - * @param useInboundSession - */ - public void setUseInboundSession(Boolean useInboundSession) { - info.setUseInboundSession(useInboundSession); - } - - /** - * @see org.apache.activemq.ra.MessageResourceAdapter#getConnectionFactory() - */ - public ActiveMQConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - /** - * This allows a connection factory to be configured and shared between a - * ResourceAdaptor and outbound messaging. Note that setting the - * connectionFactory will overload many of the properties on this POJO such - * as the redelivery and prefetch policies; the properties on the - * connectionFactory will be used instead. - */ - public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; - } - -} diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java index f7c1c44701..248d7ce1ef 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java @@ -22,7 +22,6 @@ import javax.resource.spi.BootstrapContext; import javax.resource.spi.ResourceAdapter; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; /** * Knows how to connect to one ActiveMQ server. It can then activate endpoints @@ -33,18 +32,10 @@ import org.apache.activemq.ActiveMQConnectionFactory; */ interface MessageResourceAdapter extends ResourceAdapter { - /** - */ - ActiveMQConnection makeConnection() throws JMSException; - /** */ ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException; - /** - */ - ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException; - /** * @param activationSpec */ @@ -64,8 +55,4 @@ interface MessageResourceAdapter extends ResourceAdapter { */ ActiveMQConnectionRequestInfo getInfo(); - /** - */ - ActiveMQConnectionFactory getConnectionFactory(); - } diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java new file mode 100644 index 0000000000..4ed767d904 --- /dev/null +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2008 hak8fe. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * under the License. + */ + +package org.apache.activemq.ra; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import javax.jms.Connection; +import junit.framework.TestCase; + +/** + * + * @author hak8fe + */ +public class ActiveMQConnectionFactoryTest extends TestCase { + + ActiveMQManagedConnectionFactory mcf; + ActiveMQConnectionRequestInfo info; + String url = "vm://localhost"; + String user = "defaultUser"; + String pwd = "defaultPasswd"; + + public ActiveMQConnectionFactoryTest(String testName) { + super(testName); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + mcf = new ActiveMQManagedConnectionFactory(); + info = new ActiveMQConnectionRequestInfo(); + info.setServerUrl(url); + info.setUserName(user); + info.setPassword(pwd); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testSerializability() throws Exception + { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(mcf, new ConnectionManagerAdapter(), info); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(factory); + oos.close(); + byte[] byteArray = bos.toByteArray(); + + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(byteArray)); + ActiveMQConnectionFactory deserializedFactory = (ActiveMQConnectionFactory) ois.readObject(); + ois.close(); + + Connection con = deserializedFactory.createConnection("defaultUser", "defaultPassword"); + assertNotNull("Connection object returned by ActiveMQConnectionFactory.createConnection() is null", con); + } + +} diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java index 5734b0e4a8..db64a4b259 100755 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java @@ -16,9 +16,13 @@ */ package org.apache.activemq.ra; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.PrintWriter; import java.io.Serializable; import java.util.HashSet; -import java.util.Timer; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -27,13 +31,9 @@ import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnectionFactory; import javax.resource.Referenceable; import javax.resource.ResourceException; -import javax.resource.spi.BootstrapContext; import javax.resource.spi.ConnectionRequestInfo; import javax.resource.spi.ManagedConnection; import javax.resource.spi.ManagedConnectionFactory; -import javax.resource.spi.UnavailableException; -import javax.resource.spi.XATerminator; -import javax.resource.spi.work.WorkManager; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; @@ -52,27 +52,10 @@ public class ManagedConnectionFactoryTest extends TestCase { */ protected void setUp() throws Exception { - ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); - adapter.setServerUrl(DEFAULT_HOST); - adapter.setUserName(ActiveMQConnectionFactory.DEFAULT_USER); - adapter.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD); - adapter.start(new BootstrapContext() { - public WorkManager getWorkManager() { - return null; - } - - public XATerminator getXATerminator() { - return null; - } - - public Timer createTimer() throws UnavailableException { - return null; - } - }); - managedConnectionFactory = new ActiveMQManagedConnectionFactory(); - managedConnectionFactory.setResourceAdapter(adapter); - + managedConnectionFactory.setServerUrl(DEFAULT_HOST); + managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER); + managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD); } public void testConnectionFactoryAllocation() throws ResourceException, JMSException { @@ -155,4 +138,33 @@ public class ManagedConnectionFactoryTest extends TestCase { assertTrue(cf instanceof TopicConnectionFactory); } + public void testSerializability() throws Exception { + + managedConnectionFactory.setLogWriter(new PrintWriter(new ByteArrayOutputStream())); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(managedConnectionFactory); + oos.close(); + byte[] byteArray = bos.toByteArray(); + + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(byteArray)); + ActiveMQManagedConnectionFactory deserializedFactory = (ActiveMQManagedConnectionFactory) ois.readObject(); + ois.close(); + + assertNull( + "[logWriter] property of deserialized ActiveMQManagedConnectionFactory is not null", + deserializedFactory.getLogWriter()); + assertNotNull( + "ConnectionRequestInfo of deserialized ActiveMQManagedConnectionFactory is null", + deserializedFactory.getInfo()); + assertEquals( + "[serverUrl] property of deserialized ConnectionRequestInfo object is not [" + DEFAULT_HOST + "]", + DEFAULT_HOST, + deserializedFactory.getInfo().getServerUrl()); + assertNotNull( + "Log instance of deserialized ActiveMQManagedConnectionFactory is null", + deserializedFactory.log); +} + } diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionTest.java index 5917530e4b..f9bfcfa8fc 100755 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionTest.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.ra; -import java.util.Timer; - import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; @@ -28,11 +26,7 @@ import javax.jms.Session; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.resource.ResourceException; -import javax.resource.spi.BootstrapContext; import javax.resource.spi.ConnectionEvent; -import javax.resource.spi.UnavailableException; -import javax.resource.spi.XATerminator; -import javax.resource.spi.work.WorkManager; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; @@ -55,26 +49,10 @@ public class ManagedConnectionTest extends TestCase { */ protected void setUp() throws Exception { - ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); - adapter.setServerUrl(DEFAULT_HOST); - adapter.setUserName(ActiveMQConnectionFactory.DEFAULT_USER); - adapter.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD); - adapter.start(new BootstrapContext() { - public WorkManager getWorkManager() { - return null; - } - - public XATerminator getXATerminator() { - return null; - } - - public Timer createTimer() throws UnavailableException { - return null; - } - }); - managedConnectionFactory = new ActiveMQManagedConnectionFactory(); - managedConnectionFactory.setResourceAdapter(adapter); + managedConnectionFactory.setServerUrl(DEFAULT_HOST); + managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER); + managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD); connectionFactory = (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager); connection = (ManagedConnectionProxy)connectionFactory.createConnection();