mirror of https://github.com/apache/activemq.git
allow customisation of the redelivery policy and prefetch policy on the ActiveMQ ResourceAdapter and ActiveMQManagedConnectionFactory and fix AMQ-467
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c5b3802c61
commit
205ac1188a
|
@ -16,10 +16,12 @@
|
|||
*/
|
||||
package org.apache.activemq.ra;
|
||||
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.RedeliveryPolicy;
|
||||
|
||||
import javax.resource.spi.ConnectionRequestInfo;
|
||||
import java.io.Serializable;
|
||||
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*
|
||||
|
@ -34,17 +36,20 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
private String serverUrl;
|
||||
private String clientid;
|
||||
private Boolean useInboundSession;
|
||||
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
|
||||
|
||||
public ActiveMQConnectionRequestInfo copy() {
|
||||
try {
|
||||
return (ActiveMQConnectionRequestInfo) clone();
|
||||
ActiveMQConnectionRequestInfo answer = (ActiveMQConnectionRequestInfo) clone();
|
||||
answer.redeliveryPolicy = redeliveryPolicy.copy();
|
||||
return answer;
|
||||
}
|
||||
catch (CloneNotSupportedException e) {
|
||||
throw new RuntimeException("Could not clone: ", e);
|
||||
throw new RuntimeException("Could not clone: " + e, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ConnectionRequestInfo#hashCode()
|
||||
*/
|
||||
|
@ -59,7 +64,6 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
return rc;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see javax.resource.spi.ConnectionRequestInfo#equals(java.lang.Object)
|
||||
*/
|
||||
|
@ -71,16 +75,15 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
return false;
|
||||
}
|
||||
ActiveMQConnectionRequestInfo i = (ActiveMQConnectionRequestInfo) o;
|
||||
if ( notEqual(serverUrl, i.serverUrl) ) {
|
||||
if (notEqual(serverUrl, i.serverUrl)) {
|
||||
return false;
|
||||
}
|
||||
if ( notEqual(useInboundSession, i.useInboundSession) ) {
|
||||
if (notEqual(useInboundSession, i.useInboundSession)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param i
|
||||
* @return
|
||||
|
@ -97,7 +100,8 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
}
|
||||
|
||||
/**
|
||||
* @param url The url to set.
|
||||
* @param url
|
||||
* The url to set.
|
||||
*/
|
||||
public void setServerUrl(String url) {
|
||||
this.serverUrl = url;
|
||||
|
@ -111,7 +115,8 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
}
|
||||
|
||||
/**
|
||||
* @param password The password to set.
|
||||
* @param password
|
||||
* The password to set.
|
||||
*/
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
|
@ -125,7 +130,8 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
}
|
||||
|
||||
/**
|
||||
* @param userid The userid to set.
|
||||
* @param userid
|
||||
* The userid to set.
|
||||
*/
|
||||
public void setUserName(String userid) {
|
||||
this.userName = userid;
|
||||
|
@ -139,34 +145,139 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
|
|||
}
|
||||
|
||||
/**
|
||||
* @param clientid The clientid to set.
|
||||
* @param clientid
|
||||
* The clientid to set.
|
||||
*/
|
||||
public void setClientid(String clientid) {
|
||||
this.clientid = clientid;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "ActiveMQConnectionRequestInfo{ " +
|
||||
"userName = '" + userName + "' " +
|
||||
", serverUrl = '" + serverUrl + "' " +
|
||||
", clientid = '" + clientid + "' " +
|
||||
", userName = '" + userName + "' " +
|
||||
", useInboundSession = '" + useInboundSession + "' " +
|
||||
" }";
|
||||
return "ActiveMQConnectionRequestInfo{ " + "userName = '" + userName + "' " + ", serverUrl = '" + serverUrl + "' " + ", clientid = '" + clientid + "' "
|
||||
+ ", userName = '" + userName + "' " + ", useInboundSession = '" + useInboundSession + "' " + " }";
|
||||
}
|
||||
|
||||
|
||||
public Boolean getUseInboundSession() {
|
||||
return useInboundSession;
|
||||
}
|
||||
|
||||
|
||||
public void setUseInboundSession(Boolean useInboundSession) {
|
||||
this.useInboundSession = useInboundSession;
|
||||
}
|
||||
|
||||
|
||||
public boolean isUseInboundSessionEnabled() {
|
||||
return useInboundSession!=null && useInboundSession.booleanValue();
|
||||
return useInboundSession != null && useInboundSession.booleanValue();
|
||||
}
|
||||
|
||||
public Short getRedeliveryBackOffMultiplier() {
|
||||
return new Short(redeliveryPolicy.getBackOffMultiplier());
|
||||
}
|
||||
|
||||
public Long getInitialRedeliveryDelay() {
|
||||
return new Long(redeliveryPolicy.getInitialRedeliveryDelay());
|
||||
}
|
||||
|
||||
public Integer getMaximumRedeliveries() {
|
||||
return new Integer(redeliveryPolicy.getMaximumRedeliveries());
|
||||
}
|
||||
|
||||
public Boolean getRedeliveryUseExponentialBackOff() {
|
||||
return new Boolean(redeliveryPolicy.isUseExponentialBackOff());
|
||||
}
|
||||
|
||||
public void setRedeliveryBackOffMultiplier(Short value) {
|
||||
if (value != null) {
|
||||
redeliveryPolicy.setBackOffMultiplier(value.shortValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setInitialRedeliveryDelay(Long value) {
|
||||
if (value != null) {
|
||||
redeliveryPolicy.setInitialRedeliveryDelay(value.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaximumRedeliveries(Integer value) {
|
||||
if (value != null) {
|
||||
redeliveryPolicy.setMaximumRedeliveries(value.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setRedeliveryUseExponentialBackOff(Boolean value) {
|
||||
if (value != null) {
|
||||
redeliveryPolicy.setUseExponentialBackOff(value.booleanValue());
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getDurableTopicPrefetch() {
|
||||
return new Integer(prefetchPolicy.getDurableTopicPrefetch());
|
||||
}
|
||||
|
||||
public Integer getInputStreamPrefetch() {
|
||||
return new Integer(prefetchPolicy.getInputStreamPrefetch());
|
||||
}
|
||||
|
||||
public Integer getQueueBrowserPrefetch() {
|
||||
return new Integer(prefetchPolicy.getQueueBrowserPrefetch());
|
||||
}
|
||||
|
||||
public Integer getQueuePrefetch() {
|
||||
return new Integer(prefetchPolicy.getQueuePrefetch());
|
||||
}
|
||||
|
||||
public Integer getTopicPrefetch() {
|
||||
return new Integer(prefetchPolicy.getTopicPrefetch());
|
||||
}
|
||||
|
||||
public void setAllPrefetchValues(Integer i) {
|
||||
if (i != null) {
|
||||
prefetchPolicy.setAll(i.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
|
||||
if (durableTopicPrefetch != null) {
|
||||
prefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
|
||||
if (inputStreamPrefetch != null) {
|
||||
prefetchPolicy.setInputStreamPrefetch(inputStreamPrefetch.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
|
||||
if (queueBrowserPrefetch != null) {
|
||||
prefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setQueuePrefetch(Integer queuePrefetch) {
|
||||
if (queuePrefetch != null) {
|
||||
prefetchPolicy.setQueuePrefetch(queuePrefetch.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void setTopicPrefetch(Integer topicPrefetch) {
|
||||
if (topicPrefetch != null) {
|
||||
prefetchPolicy.setTopicPrefetch(topicPrefetch.intValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the redelivery policy; not using bean properties to avoid
|
||||
* breaking compatibility with JCA configuration in J2EE
|
||||
*/
|
||||
public RedeliveryPolicy redeliveryPolicy() {
|
||||
return redeliveryPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the prefetch policy; not using bean properties to avoid
|
||||
* breaking compatibility with JCA configuration in J2EE
|
||||
*/
|
||||
public ActiveMQPrefetchPolicy prefetchPolicy() {
|
||||
return prefetchPolicy;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,8 +35,7 @@ import javax.security.auth.Subject;
|
|||
*
|
||||
* TODO: Must override equals and hashCode (JCA spec 16.4)
|
||||
*/
|
||||
public class ActiveMQManagedConnectionFactory implements
|
||||
ManagedConnectionFactory, ResourceAdapterAssociation {
|
||||
public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation {
|
||||
|
||||
private static final long serialVersionUID = 6196921962230582875L;
|
||||
|
||||
|
@ -47,15 +46,15 @@ public class ActiveMQManagedConnectionFactory implements
|
|||
public void setResourceAdapter(ResourceAdapter adapter) throws ResourceException {
|
||||
this.adapter = (ActiveMQResourceAdapter) adapter;
|
||||
ActiveMQConnectionRequestInfo baseInfo = this.adapter.getInfo().copy();
|
||||
if( info.getClientid()==null )
|
||||
if (info.getClientid() == null)
|
||||
info.setClientid(baseInfo.getClientid());
|
||||
if( info.getPassword()==null )
|
||||
if (info.getPassword() == null)
|
||||
info.setPassword(baseInfo.getPassword());
|
||||
if( info.getServerUrl()==null )
|
||||
if (info.getServerUrl() == null)
|
||||
info.setServerUrl(baseInfo.getServerUrl());
|
||||
if( info.getUseInboundSession()==null )
|
||||
if (info.getUseInboundSession() == null)
|
||||
info.setUseInboundSession(baseInfo.getUseInboundSession());
|
||||
if( info.getUserName()==null )
|
||||
if (info.getUserName() == null)
|
||||
info.setUserName(baseInfo.getUserName());
|
||||
}
|
||||
|
||||
|
@ -73,8 +72,8 @@ public class ActiveMQManagedConnectionFactory implements
|
|||
/**
|
||||
* This is used when not running in an app server. For now we are creating a
|
||||
* ConnectionFactory that has our SimpleConnectionManager implementation but
|
||||
* it may be a better idea to not support this. The JMS api will have many quirks
|
||||
* the user may not expect when running through the resource adapter.
|
||||
* it may be a better idea to not support this. The JMS api will have many
|
||||
* quirks the user may not expect when running through the resource adapter.
|
||||
*
|
||||
* @see javax.resource.spi.ManagedConnectionFactory#createConnectionFactory()
|
||||
*/
|
||||
|
@ -88,9 +87,10 @@ public class ActiveMQManagedConnectionFactory implements
|
|||
*/
|
||||
public ManagedConnection createManagedConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
|
||||
try {
|
||||
ActiveMQConnectionRequestInfo amqInfo = (ActiveMQConnectionRequestInfo)info;
|
||||
ActiveMQConnectionRequestInfo amqInfo = (ActiveMQConnectionRequestInfo) info;
|
||||
return new ActiveMQManagedConnection(subject, adapter.makeConnection(amqInfo), amqInfo);
|
||||
} catch (JMSException e) {
|
||||
}
|
||||
catch (JMSException e) {
|
||||
throw new ResourceException("Could not create connection.", e);
|
||||
}
|
||||
}
|
||||
|
@ -108,7 +108,8 @@ public class ActiveMQManagedConnectionFactory implements
|
|||
try {
|
||||
c.associate(subject, (ActiveMQConnectionRequestInfo) info);
|
||||
return c;
|
||||
} catch (JMSException e) {
|
||||
}
|
||||
catch (JMSException e) {
|
||||
throw new ResourceException(e);
|
||||
}
|
||||
}
|
||||
|
@ -130,11 +131,11 @@ public class ActiveMQManagedConnectionFactory implements
|
|||
return logWriter;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// /////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Bean setters and getters.
|
||||
//
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// /////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public String getClientid() {
|
||||
return info.getClientid();
|
||||
|
@ -175,4 +176,84 @@ public class ActiveMQManagedConnectionFactory implements
|
|||
public void setUseInboundSession(Boolean useInboundSession) {
|
||||
info.setUseInboundSession(useInboundSession);
|
||||
}
|
||||
|
||||
public Long getInitialRedeliveryDelay() {
|
||||
return info.getInitialRedeliveryDelay();
|
||||
}
|
||||
|
||||
public Integer getMaximumRedeliveries() {
|
||||
return info.getMaximumRedeliveries();
|
||||
}
|
||||
|
||||
public Short getRedeliveryBackOffMultiplier() {
|
||||
return info.getRedeliveryBackOffMultiplier();
|
||||
}
|
||||
|
||||
public Boolean getRedeliveryUseExponentialBackOff() {
|
||||
return info.getRedeliveryUseExponentialBackOff();
|
||||
}
|
||||
|
||||
public boolean isUseInboundSessionEnabled() {
|
||||
return info.isUseInboundSessionEnabled();
|
||||
}
|
||||
|
||||
public void setInitialRedeliveryDelay(Long value) {
|
||||
info.setInitialRedeliveryDelay(value);
|
||||
}
|
||||
|
||||
public void setMaximumRedeliveries(Integer value) {
|
||||
info.setMaximumRedeliveries(value);
|
||||
}
|
||||
|
||||
public void setRedeliveryBackOffMultiplier(Short value) {
|
||||
info.setRedeliveryBackOffMultiplier(value);
|
||||
}
|
||||
|
||||
public Integer getDurableTopicPrefetch() {
|
||||
return info.getDurableTopicPrefetch();
|
||||
}
|
||||
|
||||
public Integer getInputStreamPrefetch() {
|
||||
return info.getInputStreamPrefetch();
|
||||
}
|
||||
|
||||
public Integer getQueueBrowserPrefetch() {
|
||||
return info.getQueueBrowserPrefetch();
|
||||
}
|
||||
|
||||
public Integer getQueuePrefetch() {
|
||||
return info.getQueuePrefetch();
|
||||
}
|
||||
|
||||
public Integer getTopicPrefetch() {
|
||||
return info.getTopicPrefetch();
|
||||
}
|
||||
|
||||
public void setAllPrefetchValues(Integer i) {
|
||||
info.setAllPrefetchValues(i);
|
||||
}
|
||||
|
||||
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
|
||||
info.setDurableTopicPrefetch(durableTopicPrefetch);
|
||||
}
|
||||
|
||||
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
|
||||
info.setInputStreamPrefetch(inputStreamPrefetch);
|
||||
}
|
||||
|
||||
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
|
||||
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
|
||||
}
|
||||
|
||||
public void setQueuePrefetch(Integer queuePrefetch) {
|
||||
info.setQueuePrefetch(queuePrefetch);
|
||||
}
|
||||
|
||||
public void setRedeliveryUseExponentialBackOff(Boolean value) {
|
||||
info.setRedeliveryUseExponentialBackOff(value);
|
||||
}
|
||||
|
||||
public void setTopicPrefetch(Integer topicPrefetch) {
|
||||
info.setTopicPrefetch(topicPrefetch);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,7 +118,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter {
|
|||
*/
|
||||
synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(info.getServerUrl());
|
||||
|
||||
factory.setRedeliveryPolicy(info.redeliveryPolicy());
|
||||
factory.setPrefetchPolicy(info.prefetchPolicy());
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
@ -322,6 +323,86 @@ public class ActiveMQResourceAdapter implements ResourceAdapter {
|
|||
this.brokerXmlConfig=brokerXmlConfig;
|
||||
}
|
||||
|
||||
public Integer getDurableTopicPrefetch() {
|
||||
return info.getDurableTopicPrefetch();
|
||||
}
|
||||
|
||||
public Long getInitialRedeliveryDelay() {
|
||||
return info.getInitialRedeliveryDelay();
|
||||
}
|
||||
|
||||
public Integer getInputStreamPrefetch() {
|
||||
return info.getInputStreamPrefetch();
|
||||
}
|
||||
|
||||
public Integer getMaximumRedeliveries() {
|
||||
return info.getMaximumRedeliveries();
|
||||
}
|
||||
|
||||
public Integer getQueueBrowserPrefetch() {
|
||||
return info.getQueueBrowserPrefetch();
|
||||
}
|
||||
|
||||
public Integer getQueuePrefetch() {
|
||||
return info.getQueuePrefetch();
|
||||
}
|
||||
|
||||
public Short getRedeliveryBackOffMultiplier() {
|
||||
return info.getRedeliveryBackOffMultiplier();
|
||||
}
|
||||
|
||||
public Boolean getRedeliveryUseExponentialBackOff() {
|
||||
return info.getRedeliveryUseExponentialBackOff();
|
||||
}
|
||||
|
||||
public Integer getTopicPrefetch() {
|
||||
return info.getTopicPrefetch();
|
||||
}
|
||||
|
||||
public boolean isUseInboundSessionEnabled() {
|
||||
return info.isUseInboundSessionEnabled();
|
||||
}
|
||||
|
||||
public void setAllPrefetchValues(Integer i) {
|
||||
info.setAllPrefetchValues(i);
|
||||
}
|
||||
|
||||
public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
|
||||
info.setDurableTopicPrefetch(durableTopicPrefetch);
|
||||
}
|
||||
|
||||
public void setInitialRedeliveryDelay(Long value) {
|
||||
info.setInitialRedeliveryDelay(value);
|
||||
}
|
||||
|
||||
public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
|
||||
info.setInputStreamPrefetch(inputStreamPrefetch);
|
||||
}
|
||||
|
||||
public void setMaximumRedeliveries(Integer value) {
|
||||
info.setMaximumRedeliveries(value);
|
||||
}
|
||||
|
||||
public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
|
||||
info.setQueueBrowserPrefetch(queueBrowserPrefetch);
|
||||
}
|
||||
|
||||
public void setQueuePrefetch(Integer queuePrefetch) {
|
||||
info.setQueuePrefetch(queuePrefetch);
|
||||
}
|
||||
|
||||
public void setRedeliveryBackOffMultiplier(Short value) {
|
||||
info.setRedeliveryBackOffMultiplier(value);
|
||||
}
|
||||
|
||||
public void setRedeliveryUseExponentialBackOff(Boolean value) {
|
||||
info.setRedeliveryUseExponentialBackOff(value);
|
||||
}
|
||||
|
||||
public void setTopicPrefetch(Integer topicPrefetch) {
|
||||
info.setTopicPrefetch(topicPrefetch);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the info.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue