diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index dd513ff134..d4f32acc56 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -109,7 +109,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public final ConcurrentHashMap activeTempDestinations = new ConcurrentHashMap(); - protected boolean dispatchAsync; + protected boolean dispatchAsync=true; protected boolean alwaysSessionAsync = true; private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000); @@ -293,7 +293,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon checkClosedOrFailed(); ensureConnectionInfoSent(); return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED - ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), dispatchAsync, alwaysSessionAsync); + ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); } /** @@ -694,7 +694,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon info.setSubscriptionName(subscriptionName); info.setSelector(messageSelector); info.setPrefetchSize(maxMessages); - info.setDispatchAsync(dispatchAsync); + info.setDispatchAsync(isDispatchAsync()); // Allows the options on the destination to configure the consumerInfo if (info.getDestination().getOptions() != null) { @@ -1094,7 +1094,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon info.setSelector(messageSelector); info.setPrefetchSize(maxMessages); info.setNoLocal(noLocal); - info.setDispatchAsync(dispatchAsync); + info.setDispatchAsync(isDispatchAsync()); // Allows the options on the destination to configure the consumerInfo if (info.getDestination().getOptions() != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 73e955eac3..879247ad61 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -76,8 +76,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne protected String userName; protected String password; protected String clientID; - protected boolean dispatchAsync; - protected boolean alwaysSessionAsync = true; + protected boolean dispatchAsync=true; + protected boolean alwaysSessionAsync=true; JMSStatsImpl factoryStats = new JMSStatsImpl(); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index d4dbbf43f9..5c890179ce 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1002,7 +1002,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, - prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch, messageListener); + prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java index e23f6a9ba2..2b56e5dc12 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java @@ -69,6 +69,6 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); - return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, dispatchAsync); + return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, isDispatchAsync()); } }