allow the exclusive queue consumer flag to be defaulted on a connection factory / connection

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@553473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-07-05 12:21:37 +00:00
parent 94f3e17d7b
commit 3d862f0dfb
3 changed files with 48 additions and 19 deletions

View File

@ -136,6 +136,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean optimizeAcknowledge = false; private boolean optimizeAcknowledge = false;
private boolean nestedMapAndListEnabled = true; private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer; private boolean useRetroactiveConsumer;
private boolean exclusiveConsumer;
private boolean alwaysSyncSend; private boolean alwaysSyncSend;
private int closeTimeout = 15000; private int closeTimeout = 15000;
private boolean watchTopicAdvisories=true; private boolean watchTopicAdvisories=true;
@ -876,6 +877,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.nestedMapAndListEnabled = structuredMapsEnabled; this.nestedMapAndListEnabled = structuredMapsEnabled;
} }
public boolean isExclusiveConsumer() {
return exclusiveConsumer;
}
/**
* Enables or disables whether or not queue consumers should be exclusive or not
* for example to preserve ordering when not using
* <a href="http://activemq.apache.org/message-groups.html">Message Groups</a>
*
* @param exclusiveConsumer
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}
/** /**
* Adds a transport listener so that a client can be notified of events in the underlying * Adds a transport listener so that a client can be notified of events in the underlying
@ -2099,6 +2114,4 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher,Message message){ protected void rollbackDuplicate(ActiveMQDispatcher dispatcher,Message message){
connectionAudit.rollbackDuplicate(dispatcher,message); connectionAudit.rollbackDuplicate(dispatcher,message);
} }
} }

View File

@ -17,23 +17,6 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.jndi.JNDIBaseStorable; import org.apache.activemq.jndi.JNDIBaseStorable;
import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.JMSStatsImpl;
@ -47,6 +30,22 @@ import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData; import org.apache.activemq.util.URISupport.CompositeData;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
/** /**
* A ConnectionFactory is an an Administered object, and is used for creating * A ConnectionFactory is an an Administered object, and is used for creating
* Connections. <p/> This class also implements QueueConnectionFactory and * Connections. <p/> This class also implements QueueConnectionFactory and
@ -87,6 +86,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean optimizeAcknowledge = false; private boolean optimizeAcknowledge = false;
private int closeTimeout = 15000; private int closeTimeout = 15000;
private boolean useRetroactiveConsumer; private boolean useRetroactiveConsumer;
private boolean exclusiveConsumer;
private boolean nestedMapAndListEnabled = true; private boolean nestedMapAndListEnabled = true;
JMSStatsImpl factoryStats = new JMSStatsImpl(); JMSStatsImpl factoryStats = new JMSStatsImpl();
private boolean alwaysSyncSend; private boolean alwaysSyncSend;
@ -486,6 +486,21 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.useRetroactiveConsumer = useRetroactiveConsumer; this.useRetroactiveConsumer = useRetroactiveConsumer;
} }
public boolean isExclusiveConsumer() {
return exclusiveConsumer;
}
/**
* Enables or disables whether or not queue consumers should be exclusive or not
* for example to preserve ordering when not using
* <a href="http://activemq.apache.org/message-groups.html">Message Groups</a>
*
* @param exclusiveConsumer
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}
public RedeliveryPolicy getRedeliveryPolicy() { public RedeliveryPolicy getRedeliveryPolicy() {
return redeliveryPolicy; return redeliveryPolicy;
} }

View File

@ -157,6 +157,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
setTransformer(session.getTransformer()); setTransformer(session.getTransformer());
this.info = new ConsumerInfo(consumerId); this.info = new ConsumerInfo(consumerId);
this.info.setExclusive(this.session.connection.isExclusiveConsumer());
this.info.setSubscriptionName(name); this.info.setSubscriptionName(name);
this.info.setPrefetchSize(prefetch); this.info.setPrefetchSize(prefetch);
this.info.setCurrentPrefetchSize(prefetch); this.info.setCurrentPrefetchSize(prefetch);