fix for AMQ-792 to allow the async dispatch of messages to consumers to be easily configured & properly documented the javadoc. For more detail see http://activemq.org/site/consumer-dispatch-async.html

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418966 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-07-04 09:54:38 +00:00
parent 9f3fcde566
commit 480433bd74
4 changed files with 67 additions and 23 deletions

View File

@ -122,7 +122,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean copyMessageOnSend = true; private boolean copyMessageOnSend = true;
private boolean useCompression = false; private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false; private boolean objectMessageSerializationDefered = false;
protected boolean asyncDispatch = false; protected boolean dispatchAsync = false;
protected boolean alwaysSessionAsync=true; protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false; private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = false; private boolean optimizeAcknowledge = false;
@ -274,7 +274,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE; ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
:(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)), :(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
asyncDispatch,alwaysSessionAsync); dispatchAsync,alwaysSessionAsync);
} }
/** /**
@ -674,7 +674,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setSubcriptionName(subscriptionName); info.setSubcriptionName(subscriptionName);
info.setSelector(messageSelector); info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages); info.setPrefetchSize(maxMessages);
info.setDispatchAsync(asyncDispatch); info.setDispatchAsync(dispatchAsync);
// Allows the options on the destination to configure the consumerInfo // Allows the options on the destination to configure the consumerInfo
if( info.getDestination().getOptions()!=null ) { if( info.getDestination().getOptions()!=null ) {
@ -727,8 +727,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
/** /**
* @param prefetchPolicy * Sets the <a
* The prefetchPolicy to set. * href="http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html">prefetch
* policy</a> for consumers created by this connection.
*/ */
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy; this.prefetchPolicy = prefetchPolicy;
@ -1031,7 +1032,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setSelector(messageSelector); info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages); info.setPrefetchSize(maxMessages);
info.setNoLocal(noLocal); info.setNoLocal(noLocal);
info.setDispatchAsync(asyncDispatch); info.setDispatchAsync(dispatchAsync);
// Allows the options on the destination to configure the consumerInfo // Allows the options on the destination to configure the consumerInfo
if( info.getDestination().getOptions()!=null ) { if( info.getDestination().getOptions()!=null ) {
@ -1358,10 +1359,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/** /**
* @param alwaysSessionAsync The alwaysSessionAsync to set. * If this flag is set then a separate thread is not used for dispatching
* messages for each Session in the Connection. However, a separate thread
* is always used if there is more than one session, or the session isn't in
* auto acknowledge or duplicates ok mode
*
* @param alwaysSessionAsync
* The alwaysSessionAsync to set.
*/ */
public void setAlwaysSessionAsync(boolean alwaysSessionAsync){ public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
this.alwaysSessionAsync=alwaysSessionAsync; this.alwaysSessionAsync = alwaysSessionAsync;
} }
/** /**
@ -1603,12 +1610,28 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
public boolean isAsyncDispatch() { public boolean isDispatchAsync() {
return asyncDispatch; return dispatchAsync;
} }
public void setAsyncDispatch(boolean asyncDispatch) { /**
this.asyncDispatch = asyncDispatch; * Enables or disables the default setting of whether or not consumers have
* their messages <a
* href="http://incubator.apache.org/activemq/consumer-dispatch-async.html">dispatched
* synchronously or asynchronously by the broker</a>.
*
* For non-durable topics for example we typically dispatch synchronously by
* default to minimize context switches which boost performance. However
* sometimes its better to go slower to ensure that a single blocked
* consumer socket does not block delivery to other consumers.
*
* @param asyncDispatch
* If true then consumers created on this connection will default
* to having their messages dispatched asynchronously. The
* default value is false.
*/
public void setDispatchAsync(boolean asyncDispatch) {
this.dispatchAsync = asyncDispatch;
} }
public boolean isObjectMessageSerializationDefered() { public boolean isObjectMessageSerializationDefered() {

View File

@ -74,7 +74,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean copyMessageOnSend = true; private boolean copyMessageOnSend = true;
private boolean useCompression = false; private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false; private boolean objectMessageSerializationDefered = false;
protected boolean asyncDispatch = false; protected boolean dispatchAsync = false;
protected boolean alwaysSessionAsync=true; protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false; private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = false; private boolean optimizeAcknowledge = false;
@ -227,7 +227,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setCopyMessageOnSend(isCopyMessageOnSend()); connection.setCopyMessageOnSend(isCopyMessageOnSend());
connection.setUseCompression(isUseCompression()); connection.setUseCompression(isUseCompression());
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
connection.setAsyncDispatch(isAsyncDispatch()); connection.setDispatchAsync(isDispatchAsync());
connection.setUseAsyncSend(isUseAsyncSend()); connection.setUseAsyncSend(isUseAsyncSend());
connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
@ -337,6 +337,11 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
return prefetchPolicy; return prefetchPolicy;
} }
/**
* Sets the <a
* href="http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html">prefetch
* policy</a> for consumers created by this connection.
*/
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy; this.prefetchPolicy = prefetchPolicy;
} }
@ -419,7 +424,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
} }
public void populateProperties(Properties props) { public void populateProperties(Properties props) {
props.setProperty("asyncDispatch", Boolean.toString(isAsyncDispatch())); props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
if (getBrokerURL() != null) { if (getBrokerURL() != null) {
props.setProperty(Context.PROVIDER_URL, getBrokerURL()); props.setProperty(Context.PROVIDER_URL, getBrokerURL());
@ -472,12 +477,28 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.objectMessageSerializationDefered = objectMessageSerializationDefered; this.objectMessageSerializationDefered = objectMessageSerializationDefered;
} }
public boolean isAsyncDispatch() { public boolean isDispatchAsync() {
return asyncDispatch; return dispatchAsync;
} }
public void setAsyncDispatch(boolean asyncDispatch) { /**
this.asyncDispatch = asyncDispatch; * Enables or disables the default setting of whether or not consumers have
* their messages <a
* href="http://incubator.apache.org/activemq/consumer-dispatch-async.html">dispatched
* synchronously or asynchronously by the broker</a>.
*
* For non-durable topics for example we typically dispatch synchronously by
* default to minimize context switches which boost performance. However
* sometimes its better to go slower to ensure that a single blocked
* consumer socket does not block delivery to other consumers.
*
* @param asyncDispatch
* If true then consumers created on this connection will default
* to having their messages dispatched asynchronously. The
* default value is false.
*/
public void setDispatchAsync(boolean asyncDispatch) {
this.dispatchAsync = asyncDispatch;
} }
/** /**

View File

@ -75,6 +75,6 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed(); checkClosedOrFailed();
ensureConnectionInfoSent(); ensureConnectionInfoSent();
return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, asyncDispatch); return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, dispatchAsync);
} }
} }

View File

@ -27,7 +27,7 @@ public class ObjectFactoryTest extends CombinationTestSupport {
public void testConnectionFactory() throws Exception { public void testConnectionFactory() throws Exception {
// Create sample connection factory // Create sample connection factory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setAsyncDispatch(false); factory.setDispatchAsync(false);
factory.setBrokerURL("vm://test"); factory.setBrokerURL("vm://test");
factory.setClientID("test"); factory.setClientID("test");
factory.setCopyMessageOnSend(false); factory.setCopyMessageOnSend(false);
@ -53,7 +53,7 @@ public class ObjectFactoryTest extends CombinationTestSupport {
temp = (ActiveMQConnectionFactory)refFactory.getObjectInstance(ref, null, null, null); temp = (ActiveMQConnectionFactory)refFactory.getObjectInstance(ref, null, null, null);
// Check settings // Check settings
assertEquals(factory.isAsyncDispatch(), temp.isAsyncDispatch()); assertEquals(factory.isDispatchAsync(), temp.isDispatchAsync());
assertEquals(factory.getBrokerURL(), temp.getBrokerURL()); assertEquals(factory.getBrokerURL(), temp.getBrokerURL());
assertEquals(factory.getClientID(), temp.getClientID()); assertEquals(factory.getClientID(), temp.getClientID());
assertEquals(factory.isCopyMessageOnSend(), temp.isCopyMessageOnSend()); assertEquals(factory.isCopyMessageOnSend(), temp.isCopyMessageOnSend());