From 3d862f0dfb8bc097ff6a4e45e0cc25b06e498e9d Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 5 Jul 2007 12:21:37 +0000 Subject: [PATCH] allow the exclusive queue consumer flag to be defaulted on a connection factory / connection git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@553473 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 17 ++++++- .../activemq/ActiveMQConnectionFactory.java | 49 ++++++++++++------- .../activemq/ActiveMQMessageConsumer.java | 1 + 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 462a92f733..9df1ac06fb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -136,6 +136,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean optimizeAcknowledge = false; private boolean nestedMapAndListEnabled = true; private boolean useRetroactiveConsumer; + private boolean exclusiveConsumer; private boolean alwaysSyncSend; private int closeTimeout = 15000; private boolean watchTopicAdvisories=true; @@ -876,6 +877,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.nestedMapAndListEnabled = structuredMapsEnabled; } + public boolean isExclusiveConsumer() { + return exclusiveConsumer; + } + + /** + * Enables or disables whether or not queue consumers should be exclusive or not + * for example to preserve ordering when not using + * Message Groups + * + * @param exclusiveConsumer + */ + public void setExclusiveConsumer(boolean exclusiveConsumer) { + this.exclusiveConsumer = exclusiveConsumer; + } /** * Adds a transport listener so that a client can be notified of events in the underlying @@ -2099,6 +2114,4 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected void rollbackDuplicate(ActiveMQDispatcher dispatcher,Message message){ connectionAudit.rollbackDuplicate(dispatcher,message); } - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 7ffaca0969..e911579306 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -17,23 +17,6 @@ */ package org.apache.activemq; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.naming.Context; - import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.jndi.JNDIBaseStorable; import org.apache.activemq.management.JMSStatsImpl; @@ -47,6 +30,22 @@ import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport.CompositeData; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.naming.Context; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + /** * A ConnectionFactory is an an Administered object, and is used for creating * Connections.

This class also implements QueueConnectionFactory and @@ -87,6 +86,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean optimizeAcknowledge = false; private int closeTimeout = 15000; private boolean useRetroactiveConsumer; + private boolean exclusiveConsumer; private boolean nestedMapAndListEnabled = true; JMSStatsImpl factoryStats = new JMSStatsImpl(); private boolean alwaysSyncSend; @@ -486,6 +486,21 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne this.useRetroactiveConsumer = useRetroactiveConsumer; } + public boolean isExclusiveConsumer() { + return exclusiveConsumer; + } + + /** + * Enables or disables whether or not queue consumers should be exclusive or not + * for example to preserve ordering when not using + * Message Groups + * + * @param exclusiveConsumer + */ + public void setExclusiveConsumer(boolean exclusiveConsumer) { + this.exclusiveConsumer = exclusiveConsumer; + } + public RedeliveryPolicy getRedeliveryPolicy() { return redeliveryPolicy; } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index ea0cfd2ea0..6a93c4e9ad 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -157,6 +157,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC setTransformer(session.getTransformer()); this.info = new ConsumerInfo(consumerId); + this.info.setExclusive(this.session.connection.isExclusiveConsumer()); this.info.setSubscriptionName(name); this.info.setPrefetchSize(prefetch); this.info.setCurrentPrefetchSize(prefetch);