From 205ac1188a3961a53e8947e3cb09c5760cd9846a Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 5 Jan 2006 10:44:52 +0000 Subject: [PATCH] allow customisation of the redelivery policy and prefetch policy on the ActiveMQ ResourceAdapter and ActiveMQManagedConnectionFactory and fix AMQ-467 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366152 13f79535-47bb-0310-9956-ffa450edef68 --- .../ra/ActiveMQConnectionRequestInfo.java | 157 +++++++++++++++--- .../ra/ActiveMQManagedConnectionFactory.java | 127 +++++++++++--- .../activemq/ra/ActiveMQResourceAdapter.java | 83 ++++++++- 3 files changed, 320 insertions(+), 47 deletions(-) diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java index d98a7fcc5e..999ffdd841 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.ra; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.RedeliveryPolicy; + import javax.resource.spi.ConnectionRequestInfo; import java.io.Serializable; - /** * @version $Revision$ * @@ -34,17 +36,20 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser private String serverUrl; private String clientid; private Boolean useInboundSession; + private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); public ActiveMQConnectionRequestInfo copy() { try { - return (ActiveMQConnectionRequestInfo) clone(); + ActiveMQConnectionRequestInfo answer = (ActiveMQConnectionRequestInfo) clone(); + answer.redeliveryPolicy = redeliveryPolicy.copy(); + return answer; } catch (CloneNotSupportedException e) { - throw new RuntimeException("Could not clone: ", e); + throw new RuntimeException("Could not clone: " + e, e); } } - /** * @see javax.resource.spi.ConnectionRequestInfo#hashCode() */ @@ -59,7 +64,6 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser return rc; } - /** * @see javax.resource.spi.ConnectionRequestInfo#equals(java.lang.Object) */ @@ -71,16 +75,15 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser return false; } ActiveMQConnectionRequestInfo i = (ActiveMQConnectionRequestInfo) o; - if ( notEqual(serverUrl, i.serverUrl) ) { + if (notEqual(serverUrl, i.serverUrl)) { return false; } - if ( notEqual(useInboundSession, i.useInboundSession) ) { + if (notEqual(useInboundSession, i.useInboundSession)) { return false; } return true; } - /** * @param i * @return @@ -97,7 +100,8 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser } /** - * @param url The url to set. + * @param url + * The url to set. */ public void setServerUrl(String url) { this.serverUrl = url; @@ -111,7 +115,8 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser } /** - * @param password The password to set. + * @param password + * The password to set. */ public void setPassword(String password) { this.password = password; @@ -125,7 +130,8 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser } /** - * @param userid The userid to set. + * @param userid + * The userid to set. */ public void setUserName(String userid) { this.userName = userid; @@ -139,34 +145,139 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser } /** - * @param clientid The clientid to set. + * @param clientid + * The clientid to set. */ public void setClientid(String clientid) { this.clientid = clientid; } public String toString() { - return "ActiveMQConnectionRequestInfo{ " + - "userName = '" + userName + "' " + - ", serverUrl = '" + serverUrl + "' " + - ", clientid = '" + clientid + "' " + - ", userName = '" + userName + "' " + - ", useInboundSession = '" + useInboundSession + "' " + - " }"; + return "ActiveMQConnectionRequestInfo{ " + "userName = '" + userName + "' " + ", serverUrl = '" + serverUrl + "' " + ", clientid = '" + clientid + "' " + + ", userName = '" + userName + "' " + ", useInboundSession = '" + useInboundSession + "' " + " }"; } - public Boolean getUseInboundSession() { return useInboundSession; } - public void setUseInboundSession(Boolean useInboundSession) { this.useInboundSession = useInboundSession; } - public boolean isUseInboundSessionEnabled() { - return useInboundSession!=null && useInboundSession.booleanValue(); + return useInboundSession != null && useInboundSession.booleanValue(); + } + + public Short getRedeliveryBackOffMultiplier() { + return new Short(redeliveryPolicy.getBackOffMultiplier()); + } + + public Long getInitialRedeliveryDelay() { + return new Long(redeliveryPolicy.getInitialRedeliveryDelay()); + } + + public Integer getMaximumRedeliveries() { + return new Integer(redeliveryPolicy.getMaximumRedeliveries()); + } + + public Boolean getRedeliveryUseExponentialBackOff() { + return new Boolean(redeliveryPolicy.isUseExponentialBackOff()); + } + + public void setRedeliveryBackOffMultiplier(Short value) { + if (value != null) { + redeliveryPolicy.setBackOffMultiplier(value.shortValue()); + } + } + + public void setInitialRedeliveryDelay(Long value) { + if (value != null) { + redeliveryPolicy.setInitialRedeliveryDelay(value.longValue()); + } + } + + public void setMaximumRedeliveries(Integer value) { + if (value != null) { + redeliveryPolicy.setMaximumRedeliveries(value.intValue()); + } + } + + public void setRedeliveryUseExponentialBackOff(Boolean value) { + if (value != null) { + redeliveryPolicy.setUseExponentialBackOff(value.booleanValue()); + } + } + + public Integer getDurableTopicPrefetch() { + return new Integer(prefetchPolicy.getDurableTopicPrefetch()); + } + + public Integer getInputStreamPrefetch() { + return new Integer(prefetchPolicy.getInputStreamPrefetch()); + } + + public Integer getQueueBrowserPrefetch() { + return new Integer(prefetchPolicy.getQueueBrowserPrefetch()); + } + + public Integer getQueuePrefetch() { + return new Integer(prefetchPolicy.getQueuePrefetch()); + } + + public Integer getTopicPrefetch() { + return new Integer(prefetchPolicy.getTopicPrefetch()); + } + + public void setAllPrefetchValues(Integer i) { + if (i != null) { + prefetchPolicy.setAll(i.intValue()); + } + } + + public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { + if (durableTopicPrefetch != null) { + prefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch.intValue()); + } + } + + public void setInputStreamPrefetch(Integer inputStreamPrefetch) { + if (inputStreamPrefetch != null) { + prefetchPolicy.setInputStreamPrefetch(inputStreamPrefetch.intValue()); + } + } + + public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { + if (queueBrowserPrefetch != null) { + prefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch.intValue()); + } + } + + public void setQueuePrefetch(Integer queuePrefetch) { + if (queuePrefetch != null) { + prefetchPolicy.setQueuePrefetch(queuePrefetch.intValue()); + } + } + + public void setTopicPrefetch(Integer topicPrefetch) { + if (topicPrefetch != null) { + prefetchPolicy.setTopicPrefetch(topicPrefetch.intValue()); + } + } + + /** + * Returns the redelivery policy; not using bean properties to avoid + * breaking compatibility with JCA configuration in J2EE + */ + public RedeliveryPolicy redeliveryPolicy() { + return redeliveryPolicy; + } + + /** + * Returns the prefetch policy; not using bean properties to avoid + * breaking compatibility with JCA configuration in J2EE + */ + public ActiveMQPrefetchPolicy prefetchPolicy() { + return prefetchPolicy; } } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java index c119d5e883..db5379087a 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.java @@ -31,31 +31,30 @@ import javax.resource.spi.ResourceAdapterAssociation; import javax.security.auth.Subject; /** - * @version $Revisio n$ + * @version $Revisio n$ * - * TODO: Must override equals and hashCode (JCA spec 16.4) + * TODO: Must override equals and hashCode (JCA spec 16.4) */ -public class ActiveMQManagedConnectionFactory implements - ManagedConnectionFactory, ResourceAdapterAssociation { +public class ActiveMQManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation { private static final long serialVersionUID = 6196921962230582875L; - + private ActiveMQResourceAdapter adapter; private PrintWriter logWriter; private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); - + public void setResourceAdapter(ResourceAdapter adapter) throws ResourceException { this.adapter = (ActiveMQResourceAdapter) adapter; ActiveMQConnectionRequestInfo baseInfo = this.adapter.getInfo().copy(); - if( info.getClientid()==null ) + if (info.getClientid() == null) info.setClientid(baseInfo.getClientid()); - if( info.getPassword()==null ) + if (info.getPassword() == null) info.setPassword(baseInfo.getPassword()); - if( info.getServerUrl()==null ) + if (info.getServerUrl() == null) info.setServerUrl(baseInfo.getServerUrl()); - if( info.getUseInboundSession()==null ) + if (info.getUseInboundSession() == null) info.setUseInboundSession(baseInfo.getUseInboundSession()); - if( info.getUserName()==null ) + if (info.getUserName() == null) info.setUserName(baseInfo.getUserName()); } @@ -71,11 +70,11 @@ public class ActiveMQManagedConnectionFactory implements } /** - * This is used when not running in an app server. For now we are creating a + * This is used when not running in an app server. For now we are creating a * ConnectionFactory that has our SimpleConnectionManager implementation but - * it may be a better idea to not support this. The JMS api will have many quirks - * the user may not expect when running through the resource adapter. - * + * it may be a better idea to not support this. The JMS api will have many + * quirks the user may not expect when running through the resource adapter. + * * @see javax.resource.spi.ManagedConnectionFactory#createConnectionFactory() */ public Object createConnectionFactory() throws ResourceException { @@ -88,17 +87,18 @@ public class ActiveMQManagedConnectionFactory implements */ public ManagedConnection createManagedConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException { try { - ActiveMQConnectionRequestInfo amqInfo = (ActiveMQConnectionRequestInfo)info; + ActiveMQConnectionRequestInfo amqInfo = (ActiveMQConnectionRequestInfo) info; return new ActiveMQManagedConnection(subject, adapter.makeConnection(amqInfo), amqInfo); - } catch (JMSException e) { + } + catch (JMSException e) { throw new ResourceException("Could not create connection.", e); } } /** * @see javax.resource.spi.ManagedConnectionFactory#matchManagedConnections(java.util.Set, - * javax.security.auth.Subject, - * javax.resource.spi.ConnectionRequestInfo) + * javax.security.auth.Subject, + * javax.resource.spi.ConnectionRequestInfo) */ public ManagedConnection matchManagedConnections(Set connections, Subject subject, ConnectionRequestInfo info) throws ResourceException { Iterator iterator = connections.iterator(); @@ -108,7 +108,8 @@ public class ActiveMQManagedConnectionFactory implements try { c.associate(subject, (ActiveMQConnectionRequestInfo) info); return c; - } catch (JMSException e) { + } + catch (JMSException e) { throw new ResourceException(e); } } @@ -130,12 +131,12 @@ public class ActiveMQManagedConnectionFactory implements return logWriter; } - /////////////////////////////////////////////////////////////////////////// + // ///////////////////////////////////////////////////////////////////////// // // Bean setters and getters. // - /////////////////////////////////////////////////////////////////////////// - + // ///////////////////////////////////////////////////////////////////////// + public String getClientid() { return info.getClientid(); } @@ -175,4 +176,84 @@ public class ActiveMQManagedConnectionFactory implements public void setUseInboundSession(Boolean useInboundSession) { info.setUseInboundSession(useInboundSession); } + + public Long getInitialRedeliveryDelay() { + return info.getInitialRedeliveryDelay(); + } + + public Integer getMaximumRedeliveries() { + return info.getMaximumRedeliveries(); + } + + public Short getRedeliveryBackOffMultiplier() { + return info.getRedeliveryBackOffMultiplier(); + } + + public Boolean getRedeliveryUseExponentialBackOff() { + return info.getRedeliveryUseExponentialBackOff(); + } + + public boolean isUseInboundSessionEnabled() { + return info.isUseInboundSessionEnabled(); + } + + public void setInitialRedeliveryDelay(Long value) { + info.setInitialRedeliveryDelay(value); + } + + public void setMaximumRedeliveries(Integer value) { + info.setMaximumRedeliveries(value); + } + + public void setRedeliveryBackOffMultiplier(Short value) { + info.setRedeliveryBackOffMultiplier(value); + } + + public Integer getDurableTopicPrefetch() { + return info.getDurableTopicPrefetch(); + } + + public Integer getInputStreamPrefetch() { + return info.getInputStreamPrefetch(); + } + + public Integer getQueueBrowserPrefetch() { + return info.getQueueBrowserPrefetch(); + } + + public Integer getQueuePrefetch() { + return info.getQueuePrefetch(); + } + + public Integer getTopicPrefetch() { + return info.getTopicPrefetch(); + } + + public void setAllPrefetchValues(Integer i) { + info.setAllPrefetchValues(i); + } + + public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { + info.setDurableTopicPrefetch(durableTopicPrefetch); + } + + public void setInputStreamPrefetch(Integer inputStreamPrefetch) { + info.setInputStreamPrefetch(inputStreamPrefetch); + } + + public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { + info.setQueueBrowserPrefetch(queueBrowserPrefetch); + } + + public void setQueuePrefetch(Integer queuePrefetch) { + info.setQueuePrefetch(queuePrefetch); + } + + public void setRedeliveryUseExponentialBackOff(Boolean value) { + info.setRedeliveryUseExponentialBackOff(value); + } + + public void setTopicPrefetch(Integer topicPrefetch) { + info.setTopicPrefetch(topicPrefetch); + } } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 5ec50bca17..231fa8260d 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -118,7 +118,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter { */ synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(info.getServerUrl()); - + factory.setRedeliveryPolicy(info.redeliveryPolicy()); + factory.setPrefetchPolicy(info.prefetchPolicy()); return factory; } @@ -322,6 +323,86 @@ public class ActiveMQResourceAdapter implements ResourceAdapter { this.brokerXmlConfig=brokerXmlConfig; } + public Integer getDurableTopicPrefetch() { + return info.getDurableTopicPrefetch(); + } + + public Long getInitialRedeliveryDelay() { + return info.getInitialRedeliveryDelay(); + } + + public Integer getInputStreamPrefetch() { + return info.getInputStreamPrefetch(); + } + + public Integer getMaximumRedeliveries() { + return info.getMaximumRedeliveries(); + } + + public Integer getQueueBrowserPrefetch() { + return info.getQueueBrowserPrefetch(); + } + + public Integer getQueuePrefetch() { + return info.getQueuePrefetch(); + } + + public Short getRedeliveryBackOffMultiplier() { + return info.getRedeliveryBackOffMultiplier(); + } + + public Boolean getRedeliveryUseExponentialBackOff() { + return info.getRedeliveryUseExponentialBackOff(); + } + + public Integer getTopicPrefetch() { + return info.getTopicPrefetch(); + } + + public boolean isUseInboundSessionEnabled() { + return info.isUseInboundSessionEnabled(); + } + + public void setAllPrefetchValues(Integer i) { + info.setAllPrefetchValues(i); + } + + public void setDurableTopicPrefetch(Integer durableTopicPrefetch) { + info.setDurableTopicPrefetch(durableTopicPrefetch); + } + + public void setInitialRedeliveryDelay(Long value) { + info.setInitialRedeliveryDelay(value); + } + + public void setInputStreamPrefetch(Integer inputStreamPrefetch) { + info.setInputStreamPrefetch(inputStreamPrefetch); + } + + public void setMaximumRedeliveries(Integer value) { + info.setMaximumRedeliveries(value); + } + + public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) { + info.setQueueBrowserPrefetch(queueBrowserPrefetch); + } + + public void setQueuePrefetch(Integer queuePrefetch) { + info.setQueuePrefetch(queuePrefetch); + } + + public void setRedeliveryBackOffMultiplier(Short value) { + info.setRedeliveryBackOffMultiplier(value); + } + + public void setRedeliveryUseExponentialBackOff(Boolean value) { + info.setRedeliveryUseExponentialBackOff(value); + } + + public void setTopicPrefetch(Integer topicPrefetch) { + info.setTopicPrefetch(topicPrefetch); + } + /** * @return Returns the info. */