mirror of https://github.com/apache/activemq.git
applied modified version of AMQ-1147 (keeping the implementation classes the same name to avoid issues with existing RA configurations)
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515574 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
63b82516a4
commit
d7355e7874
|
@ -1,605 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.ra;
|
||||
|
||||
import java.beans.IntrospectionException;
|
||||
import java.beans.PropertyDescriptor;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
import javax.resource.ResourceException;
|
||||
import javax.resource.spi.ActivationSpec;
|
||||
import javax.resource.spi.InvalidPropertyException;
|
||||
import javax.resource.spi.ResourceAdapter;
|
||||
|
||||
import org.apache.activemq.RedeliveryPolicy;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
|
||||
/**
|
||||
* Configures the inbound JMS consumer specification using ActiveMQ
|
||||
*
|
||||
* @org.apache.xbean.XBean element="activationSpec"
|
||||
*
|
||||
* @version $Revision$ $Date$
|
||||
*/
|
||||
public class ActiveMQActivationSpec implements ActivationSpec, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -7153087544100459975L;
|
||||
|
||||
/** Auto-acknowledge constant for <code>acknowledgeMode</code> property **/
|
||||
public static final String AUTO_ACKNOWLEDGE_MODE = "Auto-acknowledge";
|
||||
/** Dups-ok-acknowledge constant for <code>acknowledgeMode</code> property * */
|
||||
public static final String DUPS_OK_ACKNOWLEDGE_MODE = "Dups-ok-acknowledge";
|
||||
/** Durable constant for <code>subscriptionDurability</code> property * */
|
||||
public static final String DURABLE_SUBSCRIPTION = "Durable";
|
||||
/** NonDurable constant for <code>subscriptionDurability</code> property * */
|
||||
public static final String NON_DURABLE_SUBSCRIPTION = "NonDurable";
|
||||
|
||||
public static final int INVALID_ACKNOWLEDGE_MODE = -1;
|
||||
|
||||
private transient ActiveMQResourceAdapter resourceAdapter;
|
||||
private String destinationType;
|
||||
private String messageSelector;
|
||||
private String destination;
|
||||
private String acknowledgeMode = AUTO_ACKNOWLEDGE_MODE;
|
||||
private String userName;
|
||||
private String password;
|
||||
private String clientId;
|
||||
private String subscriptionName;
|
||||
private String subscriptionDurability = NON_DURABLE_SUBSCRIPTION;
|
||||
private String noLocal = "false";
|
||||
private String useRAManagedTransaction = "false";
|
||||
private String maxSessions="10";
|
||||
private String maxMessagesPerSessions="10";
|
||||
private String enableBatch = "false";
|
||||
private String maxMessagesPerBatch = "10";
|
||||
private RedeliveryPolicy redeliveryPolicy;
|
||||
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ActivationSpec#validate()
|
||||
*/
|
||||
public void validate() throws InvalidPropertyException {
|
||||
List errorMessages = new ArrayList();
|
||||
List propsNotSet = new ArrayList();
|
||||
try {
|
||||
if (!isValidDestination(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("destination", ActiveMQActivationSpec.class));
|
||||
if (!isValidDestinationType(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("destinationType", ActiveMQActivationSpec.class));
|
||||
if (!isValidAcknowledgeMode(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("acknowledgeMode", ActiveMQActivationSpec.class));
|
||||
if (!isValidSubscriptionDurability(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("subscriptionDurability", ActiveMQActivationSpec.class));
|
||||
if (!isValidClientId(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("clientId", ActiveMQActivationSpec.class));
|
||||
if (!isValidSubscriptionName(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class));
|
||||
if (!isValidMaxMessagesPerSessions(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("maxMessagesPerSessions", ActiveMQActivationSpec.class));
|
||||
if (!isValidMaxSessions(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("maxSessions", ActiveMQActivationSpec.class));
|
||||
if (!isValidMessageSelector(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("messageSelector", ActiveMQActivationSpec.class));
|
||||
if (!isValidNoLocal(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("noLocal", ActiveMQActivationSpec.class));
|
||||
if (!isValidUseRAManagedTransaction(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("useRAManagedTransaction", ActiveMQActivationSpec.class));
|
||||
if (!isValidEnableBatch(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("enableBatch", ActiveMQActivationSpec.class));
|
||||
if (!isValidMaxMessagesPerBatch(errorMessages))
|
||||
propsNotSet.add(new PropertyDescriptor("maxMessagesPerBatch", ActiveMQActivationSpec.class));
|
||||
|
||||
|
||||
} catch (IntrospectionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
if (propsNotSet.size() > 0) {
|
||||
StringBuffer b = new StringBuffer();
|
||||
b.append("Invalid settings:");
|
||||
for (Iterator iter = errorMessages.iterator(); iter.hasNext();) {
|
||||
b.append(" ");
|
||||
b.append(iter.next());
|
||||
}
|
||||
InvalidPropertyException e = new InvalidPropertyException(b.toString());
|
||||
final PropertyDescriptor[] descriptors = (PropertyDescriptor[]) propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]);
|
||||
e.setInvalidPropertyDescriptors(descriptors);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isValidUseRAManagedTransaction(List errorMessages) {
|
||||
try {
|
||||
new Boolean(noLocal);
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
errorMessages.add("noLocal must be set to: true or false.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidNoLocal(List errorMessages) {
|
||||
try {
|
||||
new Boolean(noLocal);
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
errorMessages.add("noLocal must be set to: true or false.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidMessageSelector(List errorMessages) {
|
||||
try {
|
||||
if( !isEmpty(messageSelector) ) {
|
||||
new SelectorParser().parse(messageSelector);
|
||||
}
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
errorMessages.add("messageSelector not set to valid message selector: "+e.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isValidMaxSessions(List errorMessages) {
|
||||
try {
|
||||
if( Integer.parseInt(maxSessions) > 0 ) {
|
||||
return true;
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
errorMessages.add("maxSessions must be set to number > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidMaxMessagesPerSessions(List errorMessages) {
|
||||
try {
|
||||
if( Integer.parseInt(maxMessagesPerSessions) > 0 ) {
|
||||
return true;
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
errorMessages.add("maxMessagesPerSessions must be set to number > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidMaxMessagesPerBatch(List errorMessages) {
|
||||
try {
|
||||
if( Integer.parseInt(maxMessagesPerBatch) > 0 ) {
|
||||
return true;
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
errorMessages.add("maxMessagesPerBatch must be set to number > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidEnableBatch(List errorMessages) {
|
||||
try {
|
||||
new Boolean(enableBatch);
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
errorMessages.add("enableBatch must be set to: true or false");
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter()
|
||||
*/
|
||||
public ResourceAdapter getResourceAdapter() {
|
||||
return resourceAdapter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter)
|
||||
*/
|
||||
public void setResourceAdapter(ResourceAdapter resourceAdapter) throws ResourceException {
|
||||
//spec section 5.3.3
|
||||
if (this.resourceAdapter != null) {
|
||||
throw new ResourceException("ResourceAdapter already set");
|
||||
}
|
||||
if (!(resourceAdapter instanceof ActiveMQResourceAdapter)) {
|
||||
throw new ResourceException("ResourceAdapter is not of type: " + ActiveMQResourceAdapter.class.getName());
|
||||
}
|
||||
this.resourceAdapter = (ActiveMQResourceAdapter) resourceAdapter;
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Java Bean getters and setters for this ActivationSpec class.
|
||||
//
|
||||
/////////////////////////////////////////////////////////////////////////
|
||||
/**
|
||||
* @return Returns the destinationType.
|
||||
*/
|
||||
public String getDestinationType() {
|
||||
if (!isEmpty(destinationType)) {
|
||||
return destinationType;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param destinationType The destinationType to set.
|
||||
*/
|
||||
public void setDestinationType(String destinationType) {
|
||||
this.destinationType = destinationType;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
if (!isEmpty(password)) {
|
||||
return password;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public String getUserName() {
|
||||
if (!isEmpty(userName)) {
|
||||
return userName;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setUserName(String userName) {
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the messageSelector.
|
||||
*/
|
||||
public String getMessageSelector() {
|
||||
if (!isEmpty(messageSelector)) {
|
||||
return messageSelector;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param messageSelector The messageSelector to set.
|
||||
*/
|
||||
public void setMessageSelector(String messageSelector) {
|
||||
this.messageSelector = messageSelector;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the noLocal.
|
||||
*/
|
||||
public String getNoLocal() {
|
||||
return noLocal;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param noLocal The noLocal to set.
|
||||
*/
|
||||
public void setNoLocal(String noLocal) {
|
||||
if( noLocal!=null ) {
|
||||
this.noLocal = noLocal;
|
||||
}
|
||||
}
|
||||
|
||||
public String getAcknowledgeMode() {
|
||||
if (!isEmpty(acknowledgeMode)) {
|
||||
return acknowledgeMode;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setAcknowledgeMode(String acknowledgeMode) {
|
||||
this.acknowledgeMode = acknowledgeMode;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
if (!isEmpty(clientId)) {
|
||||
return clientId;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setClientId(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public String getDestination() {
|
||||
if (!isEmpty(destination)) {
|
||||
return destination;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setDestination(String destination) {
|
||||
this.destination = destination;
|
||||
}
|
||||
|
||||
public String getSubscriptionDurability() {
|
||||
if (!isEmpty(subscriptionDurability)) {
|
||||
return subscriptionDurability;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setSubscriptionDurability(String subscriptionDurability) {
|
||||
this.subscriptionDurability = subscriptionDurability;
|
||||
}
|
||||
|
||||
public String getSubscriptionName() {
|
||||
if (!isEmpty(subscriptionName)) {
|
||||
return subscriptionName;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setSubscriptionName(String subscriptionName) {
|
||||
this.subscriptionName = subscriptionName;
|
||||
}
|
||||
|
||||
private boolean isValidSubscriptionName(List errorMessages) {
|
||||
if( !isDurableSubscription() ? true : subscriptionName != null && subscriptionName.trim().length() > 0 ) {
|
||||
return true;
|
||||
}
|
||||
errorMessages.add("subscriptionName must be set since durable subscription was requested.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidClientId(List errorMessages) {
|
||||
if( !isDurableSubscription() ? true : clientId != null && clientId.trim().length() > 0 ) {
|
||||
return true;
|
||||
}
|
||||
errorMessages.add("clientId must be set since durable subscription was requested.");
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isDurableSubscription() {
|
||||
return DURABLE_SUBSCRIPTION.equals(subscriptionDurability);
|
||||
}
|
||||
|
||||
private boolean isValidSubscriptionDurability(List errorMessages) {
|
||||
// subscriptionDurability only applies to Topics
|
||||
if ( DURABLE_SUBSCRIPTION.equals(subscriptionDurability) &&
|
||||
getDestinationType() != null && !Topic.class.getName().equals(getDestinationType())) {
|
||||
errorMessages.add("subscriptionDurability cannot be set to: "+DURABLE_SUBSCRIPTION+" when destinationType is set to "+
|
||||
Queue.class.getName()+" as it is only valid when destinationType is set to "+Topic.class.getName()+".");
|
||||
return false;
|
||||
}
|
||||
if (NON_DURABLE_SUBSCRIPTION.equals(subscriptionDurability) || DURABLE_SUBSCRIPTION.equals(subscriptionDurability))
|
||||
return true;
|
||||
errorMessages.add("subscriptionDurability must be set to: "+NON_DURABLE_SUBSCRIPTION+" or "+DURABLE_SUBSCRIPTION+".");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidAcknowledgeMode(List errorMessages) {
|
||||
if (AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) || DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode))
|
||||
return true;
|
||||
errorMessages.add("acknowledgeMode must be set to: "+AUTO_ACKNOWLEDGE_MODE+" or "+DUPS_OK_ACKNOWLEDGE_MODE+".");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidDestinationType(List errorMessages) {
|
||||
if (Queue.class.getName().equals(destinationType) || Topic.class.getName().equals(destinationType))
|
||||
return true;
|
||||
errorMessages.add("destinationType must be set to: "+Queue.class.getName()+" or "+Topic.class.getName()+".");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isValidDestination(List errorMessages) {
|
||||
if(!(destination == null || destination.equals("")))
|
||||
return true;
|
||||
errorMessages.add("destination is a required field and must be set to the destination name.");
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isEmpty(String value) {
|
||||
return value == null || "".equals(value.trim());
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "ActiveMQActivationSpec{" +
|
||||
"acknowledgeMode='" + acknowledgeMode + "'" +
|
||||
", destinationType='" + destinationType + "'" +
|
||||
", messageSelector='" + messageSelector + "'" +
|
||||
", destination='" + destination + "'" +
|
||||
", clientId='" + clientId + "'" +
|
||||
", subscriptionName='" + subscriptionName + "'" +
|
||||
", subscriptionDurability='" + subscriptionDurability + "'" +
|
||||
"}";
|
||||
}
|
||||
|
||||
public int getAcknowledgeModeForSession() {
|
||||
if( AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) {
|
||||
return Session.AUTO_ACKNOWLEDGE;
|
||||
} else if( DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) {
|
||||
return Session.DUPS_OK_ACKNOWLEDGE;
|
||||
} else {
|
||||
return INVALID_ACKNOWLEDGE_MODE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper method mostly for use in Dependency Injection containers
|
||||
* which allows you to customize the destination and destinationType properties
|
||||
* from a single ActiveMQDestination POJO
|
||||
*/
|
||||
public void setActiveMQDestination(ActiveMQDestination destination) {
|
||||
setDestination(destination.getPhysicalName());
|
||||
if (destination instanceof Queue) {
|
||||
setDestinationType(Queue.class.getName());
|
||||
}
|
||||
else {
|
||||
setDestinationType(Topic.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
public ActiveMQDestination createDestination() {
|
||||
if( isEmpty(destinationType) || isEmpty(destination) )
|
||||
return null;
|
||||
|
||||
ActiveMQDestination dest = null;
|
||||
if (Queue.class.getName().equals(destinationType)) {
|
||||
dest = new ActiveMQQueue(destination);
|
||||
} else if (Topic.class.getName().equals(destinationType)) {
|
||||
dest = new ActiveMQTopic(destination);
|
||||
} else {
|
||||
assert false : "Execution should never reach here";
|
||||
}
|
||||
return dest;
|
||||
}
|
||||
|
||||
public String getMaxMessagesPerSessions() {
|
||||
return maxMessagesPerSessions.toString();
|
||||
}
|
||||
|
||||
public void setMaxMessagesPerSessions(String maxMessagesPerSessions) {
|
||||
if( maxMessagesPerSessions!=null ) {
|
||||
this.maxMessagesPerSessions = maxMessagesPerSessions;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public String getMaxSessions() {
|
||||
return maxSessions;
|
||||
}
|
||||
public void setMaxSessions(String maxSessions) {
|
||||
if( maxSessions!=null ) {
|
||||
this.maxSessions = maxSessions;
|
||||
}
|
||||
}
|
||||
|
||||
public String getUseRAManagedTransaction() {
|
||||
return useRAManagedTransaction;
|
||||
}
|
||||
|
||||
public void setUseRAManagedTransaction(String useRAManagedTransaction) {
|
||||
if( useRAManagedTransaction!=null ) {
|
||||
this.useRAManagedTransaction = useRAManagedTransaction;
|
||||
}
|
||||
}
|
||||
|
||||
public int getMaxMessagesPerSessionsIntValue() {
|
||||
return Integer.parseInt(maxMessagesPerSessions);
|
||||
}
|
||||
|
||||
public int getMaxSessionsIntValue() {
|
||||
return Integer.parseInt(maxSessions);
|
||||
}
|
||||
|
||||
public boolean isUseRAManagedTransactionEnabled() {
|
||||
return new Boolean(useRAManagedTransaction).booleanValue();
|
||||
}
|
||||
|
||||
public boolean getNoLocalBooleanValue() {
|
||||
return new Boolean(noLocal).booleanValue();
|
||||
}
|
||||
|
||||
public String getEnableBatch() {
|
||||
return enableBatch;
|
||||
}
|
||||
|
||||
public void setEnableBatch(String enableBatch) {
|
||||
if (enableBatch != null) {
|
||||
this.enableBatch = enableBatch;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getEnableBatchBooleanValue() {
|
||||
return new Boolean(enableBatch).booleanValue();
|
||||
}
|
||||
|
||||
public int getMaxMessagesPerBatchIntValue() {
|
||||
return Integer.parseInt(maxMessagesPerBatch);
|
||||
}
|
||||
|
||||
public String getMaxMessagesPerBatch() {
|
||||
return maxMessagesPerBatch.toString();
|
||||
}
|
||||
|
||||
public void setMaxMessagesPerBatch(String maxMessagesPerBatch) {
|
||||
if (maxMessagesPerBatch != null) {
|
||||
this.maxMessagesPerBatch = maxMessagesPerBatch;
|
||||
}
|
||||
}
|
||||
|
||||
public short getBackOffMultiplier() {
|
||||
if (redeliveryPolicy == null) {
|
||||
return 0;
|
||||
}
|
||||
return redeliveryPolicy.getBackOffMultiplier();
|
||||
}
|
||||
|
||||
public long getInitialRedeliveryDelay() {
|
||||
if (redeliveryPolicy == null) {
|
||||
return 0;
|
||||
}
|
||||
return redeliveryPolicy.getInitialRedeliveryDelay();
|
||||
}
|
||||
|
||||
public int getMaximumRedeliveries() {
|
||||
if (redeliveryPolicy == null) {
|
||||
return 0;
|
||||
}
|
||||
return redeliveryPolicy.getMaximumRedeliveries();
|
||||
}
|
||||
|
||||
public boolean isUseExponentialBackOff() {
|
||||
if (redeliveryPolicy == null) {
|
||||
return false;
|
||||
}
|
||||
return redeliveryPolicy.isUseExponentialBackOff();
|
||||
}
|
||||
|
||||
public void setBackOffMultiplier(short backOffMultiplier) {
|
||||
lazyCreateRedeliveryPolicy().setBackOffMultiplier(backOffMultiplier);
|
||||
}
|
||||
|
||||
public void setInitialRedeliveryDelay(long initialRedeliveryDelay) {
|
||||
lazyCreateRedeliveryPolicy().setInitialRedeliveryDelay(initialRedeliveryDelay);
|
||||
}
|
||||
|
||||
public void setMaximumRedeliveries(int maximumRedeliveries) {
|
||||
lazyCreateRedeliveryPolicy().setMaximumRedeliveries(maximumRedeliveries);
|
||||
}
|
||||
|
||||
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
|
||||
lazyCreateRedeliveryPolicy().setUseExponentialBackOff(useExponentialBackOff);
|
||||
}
|
||||
|
||||
// don't use getter to avoid causing introspection errors in containers
|
||||
public RedeliveryPolicy redeliveryPolicy() {
|
||||
return redeliveryPolicy;
|
||||
}
|
||||
|
||||
protected RedeliveryPolicy lazyCreateRedeliveryPolicy() {
|
||||
if (redeliveryPolicy == null) {
|
||||
redeliveryPolicy = new RedeliveryPolicy();
|
||||
}
|
||||
return redeliveryPolicy;
|
||||
}
|
||||
}
|
||||
|
|
@ -22,12 +22,12 @@ import javax.resource.spi.endpoint.MessageEndpointFactory;
|
|||
|
||||
public class ActiveMQEndpointActivationKey {
|
||||
final private MessageEndpointFactory messageEndpointFactory;
|
||||
final private ActiveMQActivationSpec activationSpec;
|
||||
final private MessageActivationSpec activationSpec;
|
||||
|
||||
/**
|
||||
* @return Returns the activationSpec.
|
||||
*/
|
||||
public ActiveMQActivationSpec getActivationSpec() {
|
||||
public MessageActivationSpec getActivationSpec() {
|
||||
return activationSpec;
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,7 @@ public class ActiveMQEndpointActivationKey {
|
|||
* @param messageEndpointFactory
|
||||
* @param activationSpec
|
||||
*/
|
||||
public ActiveMQEndpointActivationKey(MessageEndpointFactory messageEndpointFactory, ActiveMQActivationSpec activationSpec) {
|
||||
public ActiveMQEndpointActivationKey(MessageEndpointFactory messageEndpointFactory, MessageActivationSpec activationSpec) {
|
||||
this.messageEndpointFactory = messageEndpointFactory;
|
||||
this.activationSpec = activationSpec;
|
||||
}
|
||||
|
|
|
@ -46,6 +46,10 @@ import org.apache.commons.logging.LogFactory;
|
|||
public class ActiveMQEndpointWorker {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ActiveMQEndpointWorker.class);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public static final Method ON_MESSAGE_METHOD;
|
||||
|
||||
private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
|
||||
|
@ -61,7 +65,7 @@ public class ActiveMQEndpointWorker {
|
|||
}
|
||||
}
|
||||
|
||||
protected ActiveMQResourceAdapter adapter;
|
||||
protected MessageResourceAdapter adapter;
|
||||
protected ActiveMQEndpointActivationKey endpointActivationKey;
|
||||
protected MessageEndpointFactory endpointFactory;
|
||||
protected WorkManager workManager;
|
||||
|
@ -88,6 +92,7 @@ public class ActiveMQEndpointWorker {
|
|||
}
|
||||
}
|
||||
catch (JMSException e) {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,6 +106,7 @@ public class ActiveMQEndpointWorker {
|
|||
}
|
||||
}
|
||||
catch (JMSException e) {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,10 +120,14 @@ public class ActiveMQEndpointWorker {
|
|||
}
|
||||
}
|
||||
catch (JMSException e) {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
public ActiveMQEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
|
||||
this.endpointActivationKey = key;
|
||||
this.adapter = adapter;
|
||||
this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
|
||||
|
@ -132,6 +142,7 @@ public class ActiveMQEndpointWorker {
|
|||
connectWork = new Work() {
|
||||
|
||||
public void release() {
|
||||
//
|
||||
}
|
||||
|
||||
synchronized public void run() {
|
||||
|
@ -140,7 +151,7 @@ public class ActiveMQEndpointWorker {
|
|||
if( connection!=null )
|
||||
return;
|
||||
|
||||
ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
|
||||
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
|
||||
try {
|
||||
connection = adapter.makeConnection(activationSpec);
|
||||
connection.start();
|
||||
|
@ -176,7 +187,7 @@ public class ActiveMQEndpointWorker {
|
|||
}
|
||||
};
|
||||
|
||||
ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
|
||||
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
|
||||
if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
|
||||
dest = new ActiveMQQueue(activationSpec.getDestination());
|
||||
} else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
|
||||
|
@ -187,6 +198,9 @@ public class ActiveMQEndpointWorker {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
synchronized public void start() throws WorkException, ResourceException {
|
||||
if (running)
|
||||
return;
|
||||
|
@ -257,7 +271,9 @@ public class ActiveMQEndpointWorker {
|
|||
this.reconnectDelay=MAX_RECONNECT_DELAY;
|
||||
}
|
||||
connect();
|
||||
} catch(InterruptedException e) {}
|
||||
} catch(InterruptedException e) {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
protected void registerThreadSession(Session session) {
|
||||
|
|
|
@ -42,12 +42,18 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
|
|||
|
||||
private static final long serialVersionUID = 6196921962230582875L;
|
||||
|
||||
private ActiveMQResourceAdapter adapter;
|
||||
private MessageResourceAdapter adapter;
|
||||
private PrintWriter logWriter;
|
||||
private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter)
|
||||
*/
|
||||
public void setResourceAdapter(ResourceAdapter adapter) throws ResourceException {
|
||||
this.adapter = (ActiveMQResourceAdapter) adapter;
|
||||
if (!(adapter instanceof MessageResourceAdapter)) {
|
||||
throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName());
|
||||
}
|
||||
this.adapter = (MessageResourceAdapter) adapter;
|
||||
ActiveMQConnectionRequestInfo baseInfo = this.adapter.getInfo().copy();
|
||||
if (info.getClientid() == null)
|
||||
info.setClientid(baseInfo.getClientid());
|
||||
|
@ -61,6 +67,10 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
|
|||
info.setUserName(baseInfo.getUserName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#equals(java.lang.Object)
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object object) {
|
||||
if( object == null || object.getClass()!=ActiveMQManagedConnectionFactory.class ) {
|
||||
return false;
|
||||
|
@ -68,11 +78,18 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
|
|||
return ((ActiveMQManagedConnectionFactory)object).info.equals(info);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#hashCode()
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return info.hashCode();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter()
|
||||
*/
|
||||
public ResourceAdapter getResourceAdapter() {
|
||||
return adapter;
|
||||
}
|
||||
|
@ -155,117 +172,204 @@ public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactor
|
|||
//
|
||||
// /////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public String getClientid() {
|
||||
return info.getClientid();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public String getPassword() {
|
||||
return info.getPassword();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public String getUserName() {
|
||||
return info.getUserName();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setClientid(String clientid) {
|
||||
info.setClientid(clientid);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setPassword(String password) {
|
||||
info.setPassword(password);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setUserName(String userid) {
|
||||
info.setUserName(userid);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Boolean getUseInboundSession() {
|
||||
return info.getUseInboundSession();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setUseInboundSession(Boolean useInboundSession) {
|
||||
info.setUseInboundSession(useInboundSession);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public boolean isUseInboundSessionEnabled() {
|
||||
return info.isUseInboundSessionEnabled();
|
||||
}
|
||||
|
||||
// Redelivery policy configuration
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Long getInitialRedeliveryDelay() {
|
||||
return info.getInitialRedeliveryDelay();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Integer getMaximumRedeliveries() {
|
||||
return info.getMaximumRedeliveries();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Short getRedeliveryBackOffMultiplier() {
|
||||
return info.getRedeliveryBackOffMultiplier();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Boolean getRedeliveryUseExponentialBackOff() {
|
||||
return info.getRedeliveryUseExponentialBackOff();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setInitialRedeliveryDelay(Long value) {
|
||||
info.setInitialRedeliveryDelay(value);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setMaximumRedeliveries(Integer value) {
|
||||
info.setMaximumRedeliveries(value);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setRedeliveryBackOffMultiplier(Short value) {
|
||||
info.setRedeliveryBackOffMultiplier(value);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setRedeliveryUseExponentialBackOff(Boolean value) {
|
||||
info.setRedeliveryUseExponentialBackOff(value);
|
||||
}
|
||||
|
||||
|
||||
// Prefetch policy configuration
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Integer getDurableTopicPrefetch() {
|
||||
return info.getDurableTopicPrefetch();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Integer getInputStreamPrefetch() {
|
||||
return info.getInputStreamPrefetch();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Integer getQueueBrowserPrefetch() {
|
||||
return info.getQueueBrowserPrefetch();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Integer getQueuePrefetch() {
|
||||
return info.getQueuePrefetch();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public Integer getTopicPrefetch() {
|
||||
return info.getTopicPrefetch();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setAllPrefetchValues(Integer i) {
|
||||
info.setAllPrefetchValues(i);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
|
||||
info.setDurableTopicPrefetch(durableTopicPrefetch);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
|
||||
info.setInputStreamPrefetch(inputStreamPrefetch);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
|
||||
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void setQueuePrefetch(Integer queuePrefetch) {
|
||||
info.setQueuePrefetch(queuePrefetch);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param topicPrefetch
|
||||
*/
|
||||
public void setTopicPrefetch(Integer topicPrefetch) {
|
||||
info.setTopicPrefetch(topicPrefetch);
|
||||
}
|
||||
|
|
|
@ -1,511 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.ra;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.RedeliveryPolicy;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.XAConnection;
|
||||
import javax.jms.XASession;
|
||||
import javax.resource.NotSupportedException;
|
||||
import javax.resource.ResourceException;
|
||||
import javax.resource.spi.ActivationSpec;
|
||||
import javax.resource.spi.BootstrapContext;
|
||||
import javax.resource.spi.ResourceAdapter;
|
||||
import javax.resource.spi.ResourceAdapterInternalException;
|
||||
import javax.resource.spi.endpoint.MessageEndpointFactory;
|
||||
import javax.transaction.xa.XAResource;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
|
||||
* and deliver messages to those end points using the connection configure in the
|
||||
* resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
|
||||
*
|
||||
* @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
|
||||
* description="The JCA Resource Adaptor for ActiveMQ"
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -5417363537865649130L;
|
||||
private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
|
||||
|
||||
private final HashMap endpointWorkers = new HashMap();
|
||||
private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
|
||||
|
||||
private BootstrapContext bootstrapContext;
|
||||
private String brokerXmlConfig;
|
||||
private BrokerService broker;
|
||||
private ActiveMQConnectionFactory connectionFactory;
|
||||
|
||||
public ActiveMQResourceAdapter() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
|
||||
*/
|
||||
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
|
||||
this.bootstrapContext = bootstrapContext;
|
||||
if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) {
|
||||
try {
|
||||
broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
|
||||
broker.start();
|
||||
} catch (Throwable e) {
|
||||
throw new ResourceAdapterInternalException("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ActiveMQConnection makeConnection() throws JMSException {
|
||||
if (connectionFactory != null) {
|
||||
return makeConnection(info, connectionFactory);
|
||||
}
|
||||
else {
|
||||
return makeConnection(info);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException {
|
||||
|
||||
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
|
||||
return makeConnection(info, connectionFactory);
|
||||
}
|
||||
|
||||
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException {
|
||||
String userName = info.getUserName();
|
||||
String password = info.getPassword();
|
||||
ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
|
||||
|
||||
String clientId = info.getClientid();
|
||||
if (clientId != null && clientId.length() > 0) {
|
||||
physicalConnection.setClientID(clientId);
|
||||
}
|
||||
return physicalConnection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param activationSpec
|
||||
*/
|
||||
public ActiveMQConnection makeConnection(ActiveMQActivationSpec activationSpec) throws JMSException {
|
||||
ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
|
||||
String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
|
||||
String password = defaultValue(activationSpec.getPassword(), info.getPassword());
|
||||
String clientId = activationSpec.getClientId();
|
||||
if (clientId != null) {
|
||||
connectionFactory.setClientID(clientId);
|
||||
}
|
||||
else {
|
||||
if (activationSpec.isDurableSubscription()) {
|
||||
log.warn("No clientID specified for durable subscription: " + activationSpec);
|
||||
}
|
||||
}
|
||||
ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
|
||||
|
||||
// have we configured a redelivery policy
|
||||
RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
|
||||
if (redeliveryPolicy != null) {
|
||||
physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
|
||||
}
|
||||
return physicalConnection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param info
|
||||
* @return
|
||||
* @throws JMSException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException {
|
||||
ActiveMQConnectionFactory factory = connectionFactory;
|
||||
if (factory != null && info.isConnectionFactoryConfigured()) {
|
||||
factory = factory.copy();
|
||||
}
|
||||
else if (factory == null) {
|
||||
factory = new ActiveMQConnectionFactory();
|
||||
}
|
||||
info.configure(factory);
|
||||
return factory;
|
||||
}
|
||||
|
||||
private String defaultValue(String value, String defaultValue) {
|
||||
if (value != null)
|
||||
return value;
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapter#stop()
|
||||
*/
|
||||
public void stop() {
|
||||
while (endpointWorkers.size() > 0) {
|
||||
ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
|
||||
endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
|
||||
}
|
||||
if (broker != null) {
|
||||
ServiceSupport.dispose(broker);
|
||||
broker = null;
|
||||
}
|
||||
this.bootstrapContext = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public BootstrapContext getBootstrapContext() {
|
||||
return bootstrapContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
|
||||
* javax.resource.spi.ActivationSpec)
|
||||
*/
|
||||
public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
|
||||
throws ResourceException {
|
||||
|
||||
// spec section 5.3.3
|
||||
if (activationSpec.getResourceAdapter() != this) {
|
||||
throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
|
||||
}
|
||||
|
||||
if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
|
||||
|
||||
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
|
||||
(ActiveMQActivationSpec) activationSpec);
|
||||
// This is weird.. the same endpoint activated twice.. must be a
|
||||
// container error.
|
||||
if (endpointWorkers.containsKey(key)) {
|
||||
throw new IllegalStateException("Endpoint previously activated");
|
||||
}
|
||||
|
||||
ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
|
||||
|
||||
endpointWorkers.put(key, worker);
|
||||
worker.start();
|
||||
|
||||
} else {
|
||||
throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
|
||||
* javax.resource.spi.ActivationSpec)
|
||||
*/
|
||||
public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
|
||||
|
||||
if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
|
||||
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec) activationSpec);
|
||||
ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key);
|
||||
if (worker == null) {
|
||||
// This is weird.. that endpoint was not activated.. oh well..
|
||||
// this method
|
||||
// does not throw exceptions so just return.
|
||||
return;
|
||||
}
|
||||
try {
|
||||
worker.stop();
|
||||
} catch (InterruptedException e) {
|
||||
// We interrupted.. we won't throw an exception but will stop
|
||||
// waiting for the worker
|
||||
// to stop.. we tried our best. Keep trying to interrupt the
|
||||
// thread.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* We only connect to one resource manager per ResourceAdapter instance, so
|
||||
* any ActivationSpec will return the same XAResource.
|
||||
*
|
||||
* @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
|
||||
*/
|
||||
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = makeConnection();
|
||||
if (connection instanceof XAConnection) {
|
||||
XASession session = ((XAConnection) connection).createXASession();
|
||||
XAResource xaResource = session.getXAResource();
|
||||
return new XAResource[] { xaResource };
|
||||
} else {
|
||||
return new XAResource[] {};
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new ResourceException(e);
|
||||
} finally {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ///////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Java Bean getters and setters for this ResourceAdapter class.
|
||||
//
|
||||
// ///////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public String getClientid() {
|
||||
return emptyToNull(info.getClientid());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public String getPassword() {
|
||||
return emptyToNull(info.getPassword());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public String getServerUrl() {
|
||||
return info.getServerUrl();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public String getUserName() {
|
||||
return emptyToNull(info.getUserName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param clientid
|
||||
*/
|
||||
public void setClientid(String clientid) {
|
||||
info.setClientid(clientid);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param password
|
||||
*/
|
||||
public void setPassword(String password) {
|
||||
info.setPassword(password);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param url
|
||||
*/
|
||||
public void setServerUrl(String url) {
|
||||
info.setServerUrl(url);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param userid
|
||||
*/
|
||||
public void setUserName(String userid) {
|
||||
info.setUserName(userid);
|
||||
}
|
||||
|
||||
public String getBrokerXmlConfig() {
|
||||
return brokerXmlConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the <a href="http://activemq.org/Xml+Configuration">XML
|
||||
* configuration file </a> used to configure the ActiveMQ broker via Spring
|
||||
* if using embedded mode.
|
||||
*
|
||||
* @param brokerXmlConfig
|
||||
* is the filename which is assumed to be on the classpath unless
|
||||
* a URL is specified. So a value of <code>foo/bar.xml</code>
|
||||
* would be assumed to be on the classpath whereas
|
||||
* <code>file:dir/file.xml</code> would use the file system.
|
||||
* Any valid URL string is supported.
|
||||
* @see #setUseEmbeddedBroker(Boolean)
|
||||
*/
|
||||
public void setBrokerXmlConfig(String brokerXmlConfig) {
|
||||
this.brokerXmlConfig=brokerXmlConfig;
|
||||
}
|
||||
|
||||
public Integer getDurableTopicPrefetch() {
|
||||
return info.getDurableTopicPrefetch();
|
||||
}
|
||||
|
||||
public Long getInitialRedeliveryDelay() {
|
||||
return info.getInitialRedeliveryDelay();
|
||||
}
|
||||
|
||||
public Integer getInputStreamPrefetch() {
|
||||
return info.getInputStreamPrefetch();
|
||||
}
|
||||
|
||||
public Integer getMaximumRedeliveries() {
|
||||
return info.getMaximumRedeliveries();
|
||||
}
|
||||
|
||||
public Integer getQueueBrowserPrefetch() {
|
||||
return info.getQueueBrowserPrefetch();
|
||||
}
|
||||
|
||||
public Integer getQueuePrefetch() {
|
||||
return info.getQueuePrefetch();
|
||||
}
|
||||
|
||||
public Short getRedeliveryBackOffMultiplier() {
|
||||
return info.getRedeliveryBackOffMultiplier();
|
||||
}
|
||||
|
||||
public Boolean getRedeliveryUseExponentialBackOff() {
|
||||
return info.getRedeliveryUseExponentialBackOff();
|
||||
}
|
||||
|
||||
public Integer getTopicPrefetch() {
|
||||
return info.getTopicPrefetch();
|
||||
}
|
||||
|
||||
public boolean isUseInboundSessionEnabled() {
|
||||
return info.isUseInboundSessionEnabled();
|
||||
}
|
||||
|
||||
public void setAllPrefetchValues(Integer i) {
|
||||
info.setAllPrefetchValues(i);
|
||||
}
|
||||
|
||||
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
|
||||
info.setDurableTopicPrefetch(durableTopicPrefetch);
|
||||
}
|
||||
|
||||
public void setInitialRedeliveryDelay(Long value) {
|
||||
info.setInitialRedeliveryDelay(value);
|
||||
}
|
||||
|
||||
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
|
||||
info.setInputStreamPrefetch(inputStreamPrefetch);
|
||||
}
|
||||
|
||||
public void setMaximumRedeliveries(Integer value) {
|
||||
info.setMaximumRedeliveries(value);
|
||||
}
|
||||
|
||||
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
|
||||
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
|
||||
}
|
||||
|
||||
public void setQueuePrefetch(Integer queuePrefetch) {
|
||||
info.setQueuePrefetch(queuePrefetch);
|
||||
}
|
||||
|
||||
public void setRedeliveryBackOffMultiplier(Short value) {
|
||||
info.setRedeliveryBackOffMultiplier(value);
|
||||
}
|
||||
|
||||
public void setRedeliveryUseExponentialBackOff(Boolean value) {
|
||||
info.setRedeliveryUseExponentialBackOff(value);
|
||||
}
|
||||
|
||||
public void setTopicPrefetch(Integer topicPrefetch) {
|
||||
info.setTopicPrefetch(topicPrefetch);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the info.
|
||||
*/
|
||||
public ActiveMQConnectionRequestInfo getInfo() {
|
||||
return info;
|
||||
}
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof ActiveMQResourceAdapter)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final ActiveMQResourceAdapter activeMQResourceAdapter = (ActiveMQResourceAdapter) o;
|
||||
|
||||
if (!info.equals(activeMQResourceAdapter.info)) {
|
||||
return false;
|
||||
}
|
||||
if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.brokerXmlConfig) ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean notEqual(Object o1, Object o2) {
|
||||
return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
|
||||
}
|
||||
|
||||
|
||||
public int hashCode() {
|
||||
int result;
|
||||
result = info.hashCode();
|
||||
if( brokerXmlConfig !=null ) {
|
||||
result ^= brokerXmlConfig.hashCode();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private String emptyToNull(String value) {
|
||||
if (value == null || value.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public Boolean getUseInboundSession() {
|
||||
return info.getUseInboundSession();
|
||||
}
|
||||
|
||||
public void setUseInboundSession(Boolean useInboundSession) {
|
||||
info.setUseInboundSession(useInboundSession);
|
||||
}
|
||||
|
||||
public ActiveMQConnectionFactory getConnectionFactory() {
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
/**
|
||||
* This allows a connection factory to be configured and shared between a ResourceAdaptor and outbound messaging.
|
||||
* Note that setting the connectionFactory will overload many of the properties on this POJO such as the redelivery
|
||||
* and prefetch policies; the properties on the connectionFactory will be used instead.
|
||||
*/
|
||||
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
|
||||
this.connectionFactory = connectionFactory;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* $Header$
|
||||
*
|
||||
* Broker Office ESPECIAL (Release) - org.apache.activemq.ra
|
||||
*
|
||||
* Copyright (C) 2005-2007 Norvax, Inc.
|
||||
* All Rights Reserved
|
||||
*
|
||||
* This is UNPUBLISHED PROPRIETARY SOURCE CODE of Norvax, Inc.; the contents
|
||||
* of this file may not be disclosed to third parties, copied or duplicated
|
||||
* in any form, in whole or in part, without the prior written permission of
|
||||
* Norvax, Inc. The copyright notice above does not evidence any actual or
|
||||
* intended publication of such source code.
|
||||
*
|
||||
* Permission is hereby granted solely to the licensee for use of this source
|
||||
* code in its unaltered state. This source code may not be modified by
|
||||
* licensee except under direction of Norvax, Inc. Neither may this source
|
||||
* code be given under any circumstances to non-licensees in any form,
|
||||
* including source or binary. Modification of this source constitutes breach
|
||||
* of contract, which voids any potential pending support responsibilities by
|
||||
* Norvax, Inc. Divulging the exact or paraphrased contents of this source
|
||||
* code to unlicensed parties either directly or indirectly constitutes
|
||||
* violation of federal and international copyright and trade secret laws, and
|
||||
* will be duly prosecuted to the fullest extent permitted under law.
|
||||
*
|
||||
* This software is provided by Norvax, Inc. ``as is'' and any express or
|
||||
* implied warranties, including, but not limited to, the implied warranties
|
||||
* of merchantability and fitness for a particular purpose are disclaimed. In
|
||||
* no event shall the regents or contributors be liable for any direct,
|
||||
* indirect, incidental, special, exemplary, or consequential damages
|
||||
* (including, but not limited to, procurement of substitute goods or
|
||||
* services; loss of use, data, or profits; or business interruption) however
|
||||
* caused and on any theory of liability, whether in contract, strict
|
||||
* liability, or tort (including negligence or otherwise) arising in any way
|
||||
* out of the use of this software, even if advised of the possibility of such
|
||||
* damage.
|
||||
*
|
||||
**/
|
||||
|
||||
package org.apache.activemq.ra;
|
||||
|
||||
import javax.resource.spi.ActivationSpec;
|
||||
|
||||
import org.apache.activemq.RedeliveryPolicy;
|
||||
|
||||
/**
|
||||
* Description: Description goes here.
|
||||
*
|
||||
* @author <a href="mailto:cstach@norvax.com">Christopher G. Stach II</a>
|
||||
* @version $Revision$ $Date$
|
||||
* @since 0.1
|
||||
*/
|
||||
public interface MessageActivationSpec
|
||||
extends ActivationSpec
|
||||
{
|
||||
|
||||
/**
|
||||
*/
|
||||
String getClientId();
|
||||
|
||||
/**
|
||||
*/
|
||||
boolean isDurableSubscription();
|
||||
|
||||
/**
|
||||
*/
|
||||
String getPassword();
|
||||
|
||||
/**
|
||||
*/
|
||||
String getUserName();
|
||||
|
||||
/**
|
||||
*/
|
||||
RedeliveryPolicy redeliveryPolicy();
|
||||
|
||||
/**
|
||||
*/
|
||||
String getSubscriptionName();
|
||||
|
||||
/**
|
||||
*/
|
||||
String getMessageSelector();
|
||||
|
||||
/**
|
||||
*/
|
||||
int getMaxMessagesPerSessionsIntValue();
|
||||
|
||||
/**
|
||||
*/
|
||||
boolean getNoLocalBooleanValue();
|
||||
|
||||
/**
|
||||
*/
|
||||
String getDestinationType();
|
||||
|
||||
/**
|
||||
*/
|
||||
String getDestination();
|
||||
|
||||
/**
|
||||
*/
|
||||
int getMaxSessionsIntValue();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int getAcknowledgeModeForSession();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
String getMaxMessagesPerSessions();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
boolean isUseRAManagedTransactionEnabled();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
String getEnableBatch();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
boolean getEnableBatchBooleanValue();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int getMaxMessagesPerBatchIntValue();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
String getMaxMessagesPerBatch();
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.ra;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.resource.spi.BootstrapContext;
|
||||
import javax.resource.spi.ResourceAdapter;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
/**
|
||||
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
|
||||
* and deliver messages to those end points using the connection configure in
|
||||
* the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
|
||||
*
|
||||
* @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
|
||||
* description="The JCA Resource Adaptor for ActiveMQ"
|
||||
* @version $Revision$
|
||||
*/
|
||||
public interface MessageResourceAdapter
|
||||
extends ResourceAdapter
|
||||
{
|
||||
|
||||
/**
|
||||
*/
|
||||
public ActiveMQConnection makeConnection()
|
||||
throws JMSException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info)
|
||||
throws JMSException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory)
|
||||
throws JMSException;
|
||||
|
||||
/**
|
||||
* @param activationSpec
|
||||
*/
|
||||
public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec)
|
||||
throws JMSException;
|
||||
|
||||
/**
|
||||
* @return bootstrap context
|
||||
*/
|
||||
public BootstrapContext getBootstrapContext();
|
||||
|
||||
/**
|
||||
*/
|
||||
public String getBrokerXmlConfig();
|
||||
|
||||
/**
|
||||
* @return Returns the info.
|
||||
*/
|
||||
public ActiveMQConnectionRequestInfo getInfo();
|
||||
|
||||
/**
|
||||
*/
|
||||
public ActiveMQConnectionFactory getConnectionFactory();
|
||||
|
||||
}
|
|
@ -17,9 +17,7 @@
|
|||
*/
|
||||
package org.apache.activemq.ra;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
@ -58,7 +56,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
|
|||
}
|
||||
|
||||
private ServerSessionImpl createServerSessionImpl() throws JMSException {
|
||||
ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
|
||||
MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
|
||||
int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
|
||||
final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge);
|
||||
MessageEndpoint endpoint;
|
||||
|
@ -130,7 +128,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param message
|
||||
* @param messageDispatch the message to dispatch
|
||||
* @throws JMSException
|
||||
*/
|
||||
private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException {
|
||||
|
|
Loading…
Reference in New Issue