git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@648727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-16 15:47:24 +00:00
parent ad69a95979
commit 8733d172ae
10 changed files with 620 additions and 629 deletions

View File

@ -127,10 +127,10 @@
<childDelegation>false</childDelegation>
<useFile>true</useFile>
<includes>
<include>org.apache.activemq.ra.ServerSessionImplTest</include>
<!--
<include>**/*Test.*</include>
<include>org.apache.activemq.ra.ServerSessionImplTest</include>
-->
<include>**/*Test.*</include>
</includes>
</configuration>
</plugin>

View File

@ -42,19 +42,22 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
private static final Log LOG = LogFactory.getLog(ActiveMQConnectionFactory.class);
private ConnectionManager manager;
private transient ActiveMQManagedConnectionFactory factory;
private ActiveMQManagedConnectionFactory factory;
private Reference reference;
private final ActiveMQConnectionRequestInfo info;
/**
* @param factory
* @param manager
* @param info
* @param connectionRequestInfo
*/
public ActiveMQConnectionFactory(ActiveMQManagedConnectionFactory factory, ConnectionManager manager, ActiveMQConnectionRequestInfo info) {
public ActiveMQConnectionFactory(
ActiveMQManagedConnectionFactory factory,
ConnectionManager manager,
ActiveMQConnectionRequestInfo connectionRequestInfo) {
this.factory = factory;
this.manager = manager;
this.info = info;
this.info = connectionRequestInfo;
}
/**
@ -76,19 +79,19 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
}
/**
* @param info
* @param connectionRequestInfo
* @return
* @throws JMSException
*/
private Connection createConnection(ActiveMQConnectionRequestInfo info) throws JMSException {
private Connection createConnection(ActiveMQConnectionRequestInfo connectionRequestInfo) throws JMSException {
try {
if (info.isUseInboundSessionEnabled()) {
if (connectionRequestInfo.isUseInboundSessionEnabled()) {
return new InboundConnectionProxy();
}
if (manager == null) {
throw new JMSException("No JCA ConnectionManager configured! Either enable UseInboundSessionEnabled or get your JCA container to configure one.");
}
return (Connection)manager.allocateConnection(factory, info);
return (Connection)manager.allocateConnection(factory, connectionRequestInfo);
} catch (ResourceException e) {
// Throw the root cause if it was a JMSException..
if (e.getCause() instanceof JMSException) {

View File

@ -175,9 +175,14 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
this.clientid = clientid;
}
@Override
public String toString() {
return "ActiveMQConnectionRequestInfo{ " + "userName = '" + userName + "' " + ", serverUrl = '" + serverUrl + "' " + ", clientid = '" + clientid + "' " + ", userName = '" + userName + "' "
+ ", useInboundSession = '" + useInboundSession + "' " + " }";
return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ")
.append(", serverUrl = '").append(serverUrl).append("' ")
.append(", clientid = '").append(clientid).append("' ")
.append(", userName = '").append(userName).append("' ")
.append(", useInboundSession = '").append(useInboundSession).append("' }")
.toString();
}
public Boolean getUseInboundSession() {

View File

@ -0,0 +1,388 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Abstract base class providing support for creating physical
* connections to an ActiveMQ instance.
*
* @version $Revision$
*/
public class ActiveMQConnectionSupport {
private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
protected Log log = LogFactory.getLog(getClass());
/**
* Creates a factory for obtaining physical connections to an Active MQ
* broker. The factory is configured with the given configuration information.
*
* @param connectionRequestInfo the configuration request information
* @return the connection factory
* @throws java.lang.IllegalArgumentException if the server URL given in the
* configuration information is not a valid URL
*/
protected ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo connectionRequestInfo) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
connectionRequestInfo.configure(factory);
return factory;
}
/**
* Creates a new physical connection to an Active MQ broker identified by given
* connection request information.
*
* @param connectionRequestInfo the connection request information identifying the broker and any
* required connection parameters, e.g. username/password
* @return the physical connection
* @throws JMSException if the connection could not be established
*/
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo connectionRequestInfo) throws JMSException{
return makeConnection(connectionRequestInfo, createConnectionFactory(connectionRequestInfo));
}
/**
* Creates a new physical connection to an Active MQ broker using a given
* connection factory and credentials supplied in connection request information.
*
* @param connectionRequestInfo the connection request information containing the credentials to use
* for the connection request
* @return the physical connection
* @throws JMSException if the connection could not be established
*/
protected ActiveMQConnection makeConnection(
ActiveMQConnectionRequestInfo connectionRequestInfo,
ActiveMQConnectionFactory connectionFactory) throws JMSException
{
String userName = connectionRequestInfo.getUserName();
String password = connectionRequestInfo.getPassword();
ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
String clientId = connectionRequestInfo.getClientid();
if ( clientId != null && clientId.length() > 0 )
{
physicalConnection.setClientID(clientId);
}
return physicalConnection;
}
/**
* Gets the connection request information.
*
* @return the connection request information
*/
public ActiveMQConnectionRequestInfo getInfo()
{
return info;
}
/**
* Sets the connection request information as a whole.
*
* @param the connection request information
*/
protected void setInfo(ActiveMQConnectionRequestInfo connectionRequestInfo){
info = connectionRequestInfo;
}
protected boolean notEqual(Object o1, Object o2) {
return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
}
protected String emptyToNull(String value) {
if (value == null || value.length() == 0)
{
return null;
}
else
{
return value;
}
}
protected String defaultValue(String value, String defaultValue) {
if (value != null) {
return value;
}
return defaultValue;
}
// ///////////////////////////////////////////////////////////////////////
//
// Java Bean getters and setters for this ResourceAdapter class.
//
// ///////////////////////////////////////////////////////////////////////
/**
* @return client id
*/
public String getClientid() {
return emptyToNull(info.getClientid());
}
/**
* @param clientid
*/
public void setClientid(String clientid) {
if ( log.isDebugEnabled() ) {
log.debug("setting [clientid] to: " + clientid);
}
info.setClientid(clientid);
}
/**
* @return password
*/
public String getPassword() {
return emptyToNull(info.getPassword());
}
/**
* @param password
*/
public void setPassword(String password) {
if ( log.isDebugEnabled() ) {
log.debug("setting [password] property");
}
info.setPassword(password);
}
/**
* @return server URL
*/
public String getServerUrl() {
return info.getServerUrl();
}
/**
* @param url
*/
public void setServerUrl(String url) {
if ( log.isDebugEnabled() ) {
log.debug("setting [serverUrl] to: " + url);
}
info.setServerUrl(url);
}
/**
* @return user name
*/
public String getUserName() {
return emptyToNull(info.getUserName());
}
/**
* @param userid
*/
public void setUserName(String userid) {
if ( log.isDebugEnabled() ) {
log.debug("setting [userName] to: " + userid);
}
info.setUserName(userid);
}
/**
* @return durable topic prefetch
*/
public Integer getDurableTopicPrefetch() {
return info.getDurableTopicPrefetch();
}
/**
* @param durableTopicPrefetch
*/
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
if ( log.isDebugEnabled() ) {
log.debug("setting [durableTopicPrefetch] to: " + durableTopicPrefetch);
}
info.setDurableTopicPrefetch(durableTopicPrefetch);
}
/**
* @return initial redelivery delay
*/
public Long getInitialRedeliveryDelay() {
return info.getInitialRedeliveryDelay();
}
/**
* @param value
*/
public void setInitialRedeliveryDelay(Long value) {
if ( log.isDebugEnabled() ) {
log.debug("setting [initialRedeliveryDelay] to: " + value);
}
info.setInitialRedeliveryDelay(value);
}
/**
* @return input stream prefetch
*/
public Integer getInputStreamPrefetch() {
return info.getInputStreamPrefetch();
}
/**
* @param inputStreamPrefetch
*/
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
if ( log.isDebugEnabled() ) {
log.debug("setting [inputStreamPrefetch] to: " + inputStreamPrefetch);
}
info.setInputStreamPrefetch(inputStreamPrefetch);
}
/**
* @return maximum redeliveries
*/
public Integer getMaximumRedeliveries() {
return info.getMaximumRedeliveries();
}
/**
* @param value
*/
public void setMaximumRedeliveries(Integer value) {
if ( log.isDebugEnabled() ) {
log.debug("setting [maximumRedeliveries] to: " + value);
}
info.setMaximumRedeliveries(value);
}
/**
* @return queue browser prefetch
*/
public Integer getQueueBrowserPrefetch() {
return info.getQueueBrowserPrefetch();
}
/**
* @param queueBrowserPrefetch
*/
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
if ( log.isDebugEnabled() ) {
log.debug("setting [queueBrowserPrefetch] to: " + queueBrowserPrefetch);
}
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
}
/**
* @return queue prefetch
*/
public Integer getQueuePrefetch() {
return info.getQueuePrefetch();
}
/**
* @param queuePrefetch
*/
public void setQueuePrefetch(Integer queuePrefetch) {
if ( log.isDebugEnabled() ) {
log.debug("setting [queuePrefetch] to: " + queuePrefetch);
}
info.setQueuePrefetch(queuePrefetch);
}
/**
* @return redelivery backoff multiplier
*/
public Short getRedeliveryBackOffMultiplier() {
return info.getRedeliveryBackOffMultiplier();
}
/**
* @param value
*/
public void setRedeliveryBackOffMultiplier(Short value) {
if ( log.isDebugEnabled() ) {
log.debug("setting [redeliveryBackOffMultiplier] to: " + value);
}
info.setRedeliveryBackOffMultiplier(value);
}
/**
* @return redelivery use exponential backoff
*/
public Boolean getRedeliveryUseExponentialBackOff() {
return info.getRedeliveryUseExponentialBackOff();
}
/**
* @param value
*/
public void setRedeliveryUseExponentialBackOff(Boolean value) {
if ( log.isDebugEnabled() ) {
log.debug("setting [redeliveryUseExponentialBackOff] to: " + value);
}
info.setRedeliveryUseExponentialBackOff(value);
}
/**
* @return topic prefetch
*/
public Integer getTopicPrefetch() {
return info.getTopicPrefetch();
}
/**
* @param topicPrefetch
*/
public void setTopicPrefetch(Integer topicPrefetch) {
if ( log.isDebugEnabled() ) {
log.debug("setting [topicPrefetch] to: " + topicPrefetch);
}
info.setTopicPrefetch(topicPrefetch);
}
/**
* @param i
*/
public void setAllPrefetchValues(Integer i) {
info.setAllPrefetchValues(i);
}
/**
* @return use inbound session enabled
*/
public boolean isUseInboundSessionEnabled() {
return info.isUseInboundSessionEnabled();
}
/**
* @return use inbound session
*/
public Boolean getUseInboundSession() {
return info.getUseInboundSession();
}
/**
* @param useInboundSession
*/
public void setUseInboundSession(Boolean useInboundSession) {
if ( log.isDebugEnabled() ) {
log.debug("setting [useInboundSession] to: " + useInboundSession);
}
info.setUseInboundSession(useInboundSession);
}
}

View File

@ -16,7 +16,11 @@
*/
package org.apache.activemq.ra;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Set;
@ -29,18 +33,17 @@ import javax.resource.spi.ManagedConnectionFactory;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterAssociation;
import javax.security.auth.Subject;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revisio n$ TODO: Must override equals and hashCode (JCA spec 16.4)
* @org.apache.xbean.XBean element="managedConnectionFactory"
*/
public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation {
public class ActiveMQManagedConnectionFactory extends ActiveMQConnectionSupport
implements ManagedConnectionFactory, ResourceAdapterAssociation {
private static final long serialVersionUID = 6196921962230582875L;
private MessageResourceAdapter adapter;
private PrintWriter logWriter;
private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
/**
* @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter)
@ -49,23 +52,35 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
if (!(adapter instanceof MessageResourceAdapter)) {
throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName());
}
this.adapter = (MessageResourceAdapter)adapter;
ActiveMQConnectionRequestInfo baseInfo = this.adapter.getInfo().copy();
if (info.getClientid() == null) {
info.setClientid(baseInfo.getClientid());
else
{
if ( log.isDebugEnabled() ) {
log.debug("copying standard ResourceAdapter configuration properties");
}
if (info.getPassword() == null) {
info.setPassword(baseInfo.getPassword());
ActiveMQConnectionRequestInfo baseInfo = ((MessageResourceAdapter) adapter).getInfo().copy();
if (getClientid() == null) {
setClientid(baseInfo.getClientid());
}
if (info.getServerUrl() == null) {
info.setServerUrl(baseInfo.getServerUrl());
if (getPassword() == null) {
setPassword(baseInfo.getPassword());
}
if (info.getUseInboundSession() == null) {
info.setUseInboundSession(baseInfo.getUseInboundSession());
if (getServerUrl() == null) {
setServerUrl(baseInfo.getServerUrl());
}
if (info.getUserName() == null) {
info.setUserName(baseInfo.getUserName());
if (getUseInboundSession() == null) {
setUseInboundSession(baseInfo.getUseInboundSession());
}
if (getUserName() == null) {
setUserName(baseInfo.getUserName());
}
}
}
/**
* @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter()
*/
public ResourceAdapter getResourceAdapter() {
return null;
}
/**
@ -76,7 +91,7 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
if (object == null || object.getClass() != ActiveMQManagedConnectionFactory.class) {
return false;
}
return ((ActiveMQManagedConnectionFactory)object).info.equals(info);
return ((ActiveMQManagedConnectionFactory)object).getInfo().equals(getInfo());
}
/**
@ -84,21 +99,47 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
*/
@Override
public int hashCode() {
return info.hashCode();
return getInfo().hashCode();
}
/**
* @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter()
* Writes this factory during serialization along with the superclass' <i>info</i> property.
* This needs to be done manually since the superclass is not serializable itself.
*
* @param out the stream to write object state to
* @throws java.io.IOException if the object cannot be serialized
*/
public ResourceAdapter getResourceAdapter() {
return adapter;
private void writeObject(ObjectOutputStream out) throws IOException {
if ( logWriter != null && !(logWriter instanceof Serializable) ) {
// if the PrintWriter injected by the application server is not
// serializable we just drop the reference and let the application
// server re-inject a PrintWriter later (after this factory has been
// deserialized again) using the standard setLogWriter() method
logWriter = null;
}
out.defaultWriteObject();
out.writeObject(getInfo());
}
/**
* Restores this factory along with the superclass' <i>info</i> property.
* This needs to be done manually since the superclass is not serializable itself.
*
* @param in the stream to read object state from
* @throws java.io.IOException if the object state could not be restored
* @throws java.lang.ClassNotFoundException if the object state could not be restored
*/
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
setInfo((ActiveMQConnectionRequestInfo) in.readObject());
log = LogFactory.getLog(getClass());
}
/**
* @see javax.resource.spi.ManagedConnectionFactory#createConnectionFactory(javax.resource.spi.ConnectionManager)
*/
public Object createConnectionFactory(ConnectionManager manager) throws ResourceException {
return new ActiveMQConnectionFactory(this, manager, info);
return new ActiveMQConnectionFactory(this, manager, getInfo());
}
/**
@ -110,20 +151,22 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
* @see javax.resource.spi.ManagedConnectionFactory#createConnectionFactory()
*/
public Object createConnectionFactory() throws ResourceException {
return new ActiveMQConnectionFactory(this, new SimpleConnectionManager(), info);
return new ActiveMQConnectionFactory(this, new SimpleConnectionManager(), getInfo());
}
/**
* @see javax.resource.spi.ManagedConnectionFactory#createManagedConnection(javax.security.auth.Subject,
* javax.resource.spi.ConnectionRequestInfo)
*/
public ManagedConnection createManagedConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
try {
if (info == null) {
info = this.info;
public ManagedConnection createManagedConnection(
Subject subject,
ConnectionRequestInfo connectionRequestInfo) throws ResourceException {
ActiveMQConnectionRequestInfo amqInfo = getInfo();
if ( connectionRequestInfo instanceof ActiveMQConnectionRequestInfo ) {
amqInfo = (ActiveMQConnectionRequestInfo) connectionRequestInfo;
}
ActiveMQConnectionRequestInfo amqInfo = (ActiveMQConnectionRequestInfo)info;
return new ActiveMQManagedConnection(subject, adapter.makeConnection(amqInfo), amqInfo);
try {
return new ActiveMQManagedConnection(subject, makeConnection(amqInfo), amqInfo);
} catch (JMSException e) {
throw new ResourceException("Could not create connection.", e);
}
@ -134,13 +177,16 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
* javax.security.auth.Subject,
* javax.resource.spi.ConnectionRequestInfo)
*/
public ManagedConnection matchManagedConnections(Set connections, Subject subject, ConnectionRequestInfo info) throws ResourceException {
public ManagedConnection matchManagedConnections(
Set connections,
Subject subject,
ConnectionRequestInfo connectionRequestInfo) throws ResourceException {
Iterator iterator = connections.iterator();
while (iterator.hasNext()) {
ActiveMQManagedConnection c = (ActiveMQManagedConnection)iterator.next();
if (c.matches(subject, info)) {
if (c.matches(subject, connectionRequestInfo)) {
try {
c.associate(subject, (ActiveMQConnectionRequestInfo)info);
c.associate(subject, (ActiveMQConnectionRequestInfo) connectionRequestInfo);
return c;
} catch (JMSException e) {
throw new ResourceException(e);
@ -153,221 +199,21 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
/**
* @see javax.resource.spi.ManagedConnectionFactory#setLogWriter(java.io.PrintWriter)
*/
public void setLogWriter(PrintWriter logWriter) throws ResourceException {
this.logWriter = logWriter;
public void setLogWriter(PrintWriter aLogWriter) throws ResourceException {
if ( log.isTraceEnabled() ) {
log.trace("setting log writer [" + aLogWriter + "]");
}
this.logWriter = aLogWriter;
}
/**
* @see javax.resource.spi.ManagedConnectionFactory#getLogWriter()
*/
public PrintWriter getLogWriter() throws ResourceException {
if ( log.isTraceEnabled() ) {
log.trace("getting log writer [" + logWriter + "]");
}
return logWriter;
}
// /////////////////////////////////////////////////////////////////////////
//
// Bean setters and getters.
//
// /////////////////////////////////////////////////////////////////////////
/**
*
*/
public String getClientid() {
return info.getClientid();
}
/**
*
*/
public String getPassword() {
return info.getPassword();
}
/**
*
*/
public String getUserName() {
return info.getUserName();
}
/**
*
*/
public void setClientid(String clientid) {
info.setClientid(clientid);
}
/**
*
*/
public void setPassword(String password) {
info.setPassword(password);
}
/**
*
*/
public void setUserName(String userid) {
info.setUserName(userid);
}
/**
*
*/
/**
*
*/
public Boolean getUseInboundSession() {
return info.getUseInboundSession();
}
/**
*
*/
public void setUseInboundSession(Boolean useInboundSession) {
info.setUseInboundSession(useInboundSession);
}
/**
*
*/
public boolean isUseInboundSessionEnabled() {
return info.isUseInboundSessionEnabled();
}
// Redelivery policy configuration
/**
*
*/
public Long getInitialRedeliveryDelay() {
return info.getInitialRedeliveryDelay();
}
/**
*
*/
public Integer getMaximumRedeliveries() {
return info.getMaximumRedeliveries();
}
/**
*
*/
public Short getRedeliveryBackOffMultiplier() {
return info.getRedeliveryBackOffMultiplier();
}
/**
*
*/
public Boolean getRedeliveryUseExponentialBackOff() {
return info.getRedeliveryUseExponentialBackOff();
}
/**
*
*/
public void setInitialRedeliveryDelay(Long value) {
info.setInitialRedeliveryDelay(value);
}
/**
*
*/
public void setMaximumRedeliveries(Integer value) {
info.setMaximumRedeliveries(value);
}
/**
*
*/
public void setRedeliveryBackOffMultiplier(Short value) {
info.setRedeliveryBackOffMultiplier(value);
}
/**
*
*/
public void setRedeliveryUseExponentialBackOff(Boolean value) {
info.setRedeliveryUseExponentialBackOff(value);
}
// Prefetch policy configuration
/**
*
*/
public Integer getDurableTopicPrefetch() {
return info.getDurableTopicPrefetch();
}
/**
*
*/
public Integer getInputStreamPrefetch() {
return info.getInputStreamPrefetch();
}
/**
*
*/
public Integer getQueueBrowserPrefetch() {
return info.getQueueBrowserPrefetch();
}
/**
*
*/
public Integer getQueuePrefetch() {
return info.getQueuePrefetch();
}
/**
*
*/
public Integer getTopicPrefetch() {
return info.getTopicPrefetch();
}
/**
*
*/
public void setAllPrefetchValues(Integer i) {
info.setAllPrefetchValues(i);
}
/**
*
*/
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
info.setDurableTopicPrefetch(durableTopicPrefetch);
}
/**
*
*/
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
info.setInputStreamPrefetch(inputStreamPrefetch);
}
/**
*
*/
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
}
/**
*
*/
public void setQueuePrefetch(Integer queuePrefetch) {
info.setQueuePrefetch(queuePrefetch);
}
/**
* @param topicPrefetch
*/
public void setTopicPrefetch(Integer topicPrefetch) {
info.setTopicPrefetch(topicPrefetch);
}
}

View File

@ -16,9 +16,7 @@
*/
package org.apache.activemq.ra;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import javax.jms.Connection;
@ -39,8 +37,6 @@ import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
@ -51,18 +47,13 @@ import org.apache.commons.logging.LogFactory;
* description="The JCA Resource Adaptor for ActiveMQ"
* @version $Revision$
*/
public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable {
private static final long serialVersionUID = -5417363537865649130L;
private static final Log LOG = LogFactory.getLog(ActiveMQResourceAdapter.class);
public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
private BootstrapContext bootstrapContext;
private String brokerXmlConfig;
private BrokerService broker;
private ActiveMQConnectionFactory connectionFactory;
private Thread brokerStartThread;
/**
@ -86,8 +77,8 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
}
broker.start();
} catch (Throwable e) {
LOG.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
LOG.debug("Reason for: "+e.getMessage(), e);
log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
log.debug("Reason for: "+e.getMessage(), e);
}
}
};
@ -107,49 +98,22 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
* @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
*/
public ActiveMQConnection makeConnection() throws JMSException {
if (connectionFactory != null) {
return makeConnection(info, connectionFactory);
}
return makeConnection(info);
}
/**
*/
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
return makeConnection(info, connectionFactory);
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection(org.apache.activemq.ra.ActiveMQConnectionRequestInfo,
* org.apache.activemq.ActiveMQConnectionFactory)
*/
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException {
String userName = info.getUserName();
String password = info.getPassword();
ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName, password);
String clientId = info.getClientid();
if (clientId != null && clientId.length() > 0) {
physicalConnection.setClientID(clientId);
}
return physicalConnection;
return makeConnection(getInfo());
}
/**
* @param activationSpec
*/
public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
String password = defaultValue(activationSpec.getPassword(), info.getPassword());
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(getInfo());
String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
String clientId = activationSpec.getClientId();
if (clientId != null) {
connectionFactory.setClientID(clientId);
} else {
if (activationSpec.isDurableSubscription()) {
LOG.warn("No clientID specified for durable subscription: " + activationSpec);
log.warn("No clientID specified for durable subscription: " + activationSpec);
}
}
ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName, password);
@ -162,29 +126,6 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
return physicalConnection;
}
/**
* @param info
* @throws JMSException
* @throws URISyntaxException
*/
private synchronized ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException {
ActiveMQConnectionFactory factory = connectionFactory;
if (factory != null && info.isConnectionFactoryConfigured()) {
factory = factory.copy();
} else if (factory == null) {
factory = new ActiveMQConnectionFactory();
}
info.configure(factory);
return factory;
}
private String defaultValue(String value, String defaultValue) {
if (value != null) {
return value;
}
return defaultValue;
}
/**
* @see javax.resource.spi.ResourceAdapter#stop()
*/
@ -306,62 +247,6 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
//
// ///////////////////////////////////////////////////////////////////////
/**
* @return client id
*/
public String getClientid() {
return emptyToNull(info.getClientid());
}
/**
* @return password
*/
public String getPassword() {
return emptyToNull(info.getPassword());
}
/**
* @return server URL
*/
public String getServerUrl() {
return info.getServerUrl();
}
/**
* @return user name
*/
public String getUserName() {
return emptyToNull(info.getUserName());
}
/**
* @param clientid
*/
public void setClientid(String clientid) {
info.setClientid(clientid);
}
/**
* @param password
*/
public void setPassword(String password) {
info.setPassword(password);
}
/**
* @param url
*/
public void setServerUrl(String url) {
info.setServerUrl(url);
}
/**
* @param userid
*/
public void setUserName(String userid) {
info.setUserName(userid);
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
*/
@ -384,153 +269,6 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
this.brokerXmlConfig = brokerXmlConfig;
}
/**
* @return durable topic prefetch
*/
public Integer getDurableTopicPrefetch() {
return info.getDurableTopicPrefetch();
}
/**
* @return initial redelivery delay
*/
public Long getInitialRedeliveryDelay() {
return info.getInitialRedeliveryDelay();
}
/**
* @return input stream prefetch
*/
public Integer getInputStreamPrefetch() {
return info.getInputStreamPrefetch();
}
/**
* @return maximum redeliveries
*/
public Integer getMaximumRedeliveries() {
return info.getMaximumRedeliveries();
}
/**
* @return queue browser prefetch
*/
public Integer getQueueBrowserPrefetch() {
return info.getQueueBrowserPrefetch();
}
/**
* @return queue prefetch
*/
public Integer getQueuePrefetch() {
return info.getQueuePrefetch();
}
/**
* @return redelivery backoff multiplier
*/
public Short getRedeliveryBackOffMultiplier() {
return info.getRedeliveryBackOffMultiplier();
}
/**
* @return redelivery use exponential backoff
*/
public Boolean getRedeliveryUseExponentialBackOff() {
return info.getRedeliveryUseExponentialBackOff();
}
/**
* @return topic prefetch
*/
public Integer getTopicPrefetch() {
return info.getTopicPrefetch();
}
/**
* @return use inbound session enabled
*/
public boolean isUseInboundSessionEnabled() {
return info.isUseInboundSessionEnabled();
}
/**
* @param i
*/
public void setAllPrefetchValues(Integer i) {
info.setAllPrefetchValues(i);
}
/**
* @param durableTopicPrefetch
*/
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
info.setDurableTopicPrefetch(durableTopicPrefetch);
}
/**
* @param value
*/
public void setInitialRedeliveryDelay(Long value) {
info.setInitialRedeliveryDelay(value);
}
/**
* @param inputStreamPrefetch
*/
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
info.setInputStreamPrefetch(inputStreamPrefetch);
}
/**
* @param value
*/
public void setMaximumRedeliveries(Integer value) {
info.setMaximumRedeliveries(value);
}
/**
* @param queueBrowserPrefetch
*/
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
}
/**
* @param queuePrefetch
*/
public void setQueuePrefetch(Integer queuePrefetch) {
info.setQueuePrefetch(queuePrefetch);
}
/**
* @param value
*/
public void setRedeliveryBackOffMultiplier(Short value) {
info.setRedeliveryBackOffMultiplier(value);
}
/**
* @param value
*/
public void setRedeliveryUseExponentialBackOff(Boolean value) {
info.setRedeliveryUseExponentialBackOff(value);
}
/**
* @param topicPrefetch
*/
public void setTopicPrefetch(Integer topicPrefetch) {
info.setTopicPrefetch(topicPrefetch);
}
/**
* @return Returns the info.
*/
public ActiveMQConnectionRequestInfo getInfo() {
return info;
}
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
@ -545,7 +283,7 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;
if (!info.equals(activeMQResourceAdapter.getInfo())) {
if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
return false;
}
if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
@ -555,60 +293,18 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
return true;
}
private boolean notEqual(Object o1, Object o2) {
return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
}
/**
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
int result;
result = info.hashCode();
result = getInfo().hashCode();
if (brokerXmlConfig != null) {
result ^= brokerXmlConfig.hashCode();
}
return result;
}
private String emptyToNull(String value) {
if (value == null || value.length() == 0) {
return null;
}
return value;
}
/**
* @return use inbound session
*/
public Boolean getUseInboundSession() {
return info.getUseInboundSession();
}
/**
* @param useInboundSession
*/
public void setUseInboundSession(Boolean useInboundSession) {
info.setUseInboundSession(useInboundSession);
}
/**
* @see org.apache.activemq.ra.MessageResourceAdapter#getConnectionFactory()
*/
public ActiveMQConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
* This allows a connection factory to be configured and shared between a
* ResourceAdaptor and outbound messaging. Note that setting the
* connectionFactory will overload many of the properties on this POJO such
* as the redelivery and prefetch policies; the properties on the
* connectionFactory will be used instead.
*/
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
}

View File

@ -22,7 +22,6 @@ import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
@ -33,18 +32,10 @@ import org.apache.activemq.ActiveMQConnectionFactory;
*/
interface MessageResourceAdapter extends ResourceAdapter {
/**
*/
ActiveMQConnection makeConnection() throws JMSException;
/**
*/
ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException;
/**
*/
ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException;
/**
* @param activationSpec
*/
@ -64,8 +55,4 @@ interface MessageResourceAdapter extends ResourceAdapter {
*/
ActiveMQConnectionRequestInfo getInfo();
/**
*/
ActiveMQConnectionFactory getConnectionFactory();
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2008 hak8fe.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* under the License.
*/
package org.apache.activemq.ra;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import javax.jms.Connection;
import junit.framework.TestCase;
/**
*
* @author hak8fe
*/
public class ActiveMQConnectionFactoryTest extends TestCase {
ActiveMQManagedConnectionFactory mcf;
ActiveMQConnectionRequestInfo info;
String url = "vm://localhost";
String user = "defaultUser";
String pwd = "defaultPasswd";
public ActiveMQConnectionFactoryTest(String testName) {
super(testName);
}
@Override
protected void setUp() throws Exception {
super.setUp();
mcf = new ActiveMQManagedConnectionFactory();
info = new ActiveMQConnectionRequestInfo();
info.setServerUrl(url);
info.setUserName(user);
info.setPassword(pwd);
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testSerializability() throws Exception
{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(mcf, new ConnectionManagerAdapter(), info);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(factory);
oos.close();
byte[] byteArray = bos.toByteArray();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(byteArray));
ActiveMQConnectionFactory deserializedFactory = (ActiveMQConnectionFactory) ois.readObject();
ois.close();
Connection con = deserializedFactory.createConnection("defaultUser", "defaultPassword");
assertNotNull("Connection object returned by ActiveMQConnectionFactory.createConnection() is null", con);
}
}

View File

@ -16,9 +16,13 @@
*/
package org.apache.activemq.ra;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Timer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -27,13 +31,9 @@ import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnectionFactory;
import javax.resource.Referenceable;
import javax.resource.ResourceException;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionFactory;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.XATerminator;
import javax.resource.spi.work.WorkManager;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -52,27 +52,10 @@ public class ManagedConnectionFactoryTest extends TestCase {
*/
protected void setUp() throws Exception {
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl(DEFAULT_HOST);
adapter.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
adapter.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
adapter.start(new BootstrapContext() {
public WorkManager getWorkManager() {
return null;
}
public XATerminator getXATerminator() {
return null;
}
public Timer createTimer() throws UnavailableException {
return null;
}
});
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setResourceAdapter(adapter);
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
}
public void testConnectionFactoryAllocation() throws ResourceException, JMSException {
@ -155,4 +138,33 @@ public class ManagedConnectionFactoryTest extends TestCase {
assertTrue(cf instanceof TopicConnectionFactory);
}
public void testSerializability() throws Exception {
managedConnectionFactory.setLogWriter(new PrintWriter(new ByteArrayOutputStream()));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(managedConnectionFactory);
oos.close();
byte[] byteArray = bos.toByteArray();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(byteArray));
ActiveMQManagedConnectionFactory deserializedFactory = (ActiveMQManagedConnectionFactory) ois.readObject();
ois.close();
assertNull(
"[logWriter] property of deserialized ActiveMQManagedConnectionFactory is not null",
deserializedFactory.getLogWriter());
assertNotNull(
"ConnectionRequestInfo of deserialized ActiveMQManagedConnectionFactory is null",
deserializedFactory.getInfo());
assertEquals(
"[serverUrl] property of deserialized ConnectionRequestInfo object is not [" + DEFAULT_HOST + "]",
DEFAULT_HOST,
deserializedFactory.getInfo().getServerUrl());
assertNotNull(
"Log instance of deserialized ActiveMQManagedConnectionFactory is null",
deserializedFactory.log);
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.ra;
import java.util.Timer;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
@ -28,11 +26,7 @@ import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.resource.ResourceException;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.XATerminator;
import javax.resource.spi.work.WorkManager;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -55,26 +49,10 @@ public class ManagedConnectionTest extends TestCase {
*/
protected void setUp() throws Exception {
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl(DEFAULT_HOST);
adapter.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
adapter.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
adapter.start(new BootstrapContext() {
public WorkManager getWorkManager() {
return null;
}
public XATerminator getXATerminator() {
return null;
}
public Timer createTimer() throws UnavailableException {
return null;
}
});
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setResourceAdapter(adapter);
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
connectionFactory = (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);
connection = (ManagedConnectionProxy)connectionFactory.createConnection();