mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@645999 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
af27811a75
commit
62fb85e6b9
|
@ -109,7 +109,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
|
|
||||||
public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
|
public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
|
||||||
|
|
||||||
protected boolean dispatchAsync;
|
protected boolean dispatchAsync=true;
|
||||||
protected boolean alwaysSessionAsync = true;
|
protected boolean alwaysSessionAsync = true;
|
||||||
|
|
||||||
private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
|
private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
|
||||||
|
@ -293,7 +293,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
checkClosedOrFailed();
|
checkClosedOrFailed();
|
||||||
ensureConnectionInfoSent();
|
ensureConnectionInfoSent();
|
||||||
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
|
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
|
||||||
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), dispatchAsync, alwaysSessionAsync);
|
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -694,7 +694,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
info.setSubscriptionName(subscriptionName);
|
info.setSubscriptionName(subscriptionName);
|
||||||
info.setSelector(messageSelector);
|
info.setSelector(messageSelector);
|
||||||
info.setPrefetchSize(maxMessages);
|
info.setPrefetchSize(maxMessages);
|
||||||
info.setDispatchAsync(dispatchAsync);
|
info.setDispatchAsync(isDispatchAsync());
|
||||||
|
|
||||||
// Allows the options on the destination to configure the consumerInfo
|
// Allows the options on the destination to configure the consumerInfo
|
||||||
if (info.getDestination().getOptions() != null) {
|
if (info.getDestination().getOptions() != null) {
|
||||||
|
@ -1094,7 +1094,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
info.setSelector(messageSelector);
|
info.setSelector(messageSelector);
|
||||||
info.setPrefetchSize(maxMessages);
|
info.setPrefetchSize(maxMessages);
|
||||||
info.setNoLocal(noLocal);
|
info.setNoLocal(noLocal);
|
||||||
info.setDispatchAsync(dispatchAsync);
|
info.setDispatchAsync(isDispatchAsync());
|
||||||
|
|
||||||
// Allows the options on the destination to configure the consumerInfo
|
// Allows the options on the destination to configure the consumerInfo
|
||||||
if (info.getDestination().getOptions() != null) {
|
if (info.getDestination().getOptions() != null) {
|
||||||
|
|
|
@ -76,8 +76,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
protected String userName;
|
protected String userName;
|
||||||
protected String password;
|
protected String password;
|
||||||
protected String clientID;
|
protected String clientID;
|
||||||
protected boolean dispatchAsync;
|
protected boolean dispatchAsync=true;
|
||||||
protected boolean alwaysSessionAsync = true;
|
protected boolean alwaysSessionAsync=true;
|
||||||
|
|
||||||
JMSStatsImpl factoryStats = new JMSStatsImpl();
|
JMSStatsImpl factoryStats = new JMSStatsImpl();
|
||||||
|
|
||||||
|
|
|
@ -1002,7 +1002,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
}
|
}
|
||||||
ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
|
ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
|
||||||
return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
|
return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
|
||||||
prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch, messageListener);
|
prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -69,6 +69,6 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC
|
||||||
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
checkClosedOrFailed();
|
checkClosedOrFailed();
|
||||||
ensureConnectionInfoSent();
|
ensureConnectionInfoSent();
|
||||||
return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, dispatchAsync);
|
return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, isDispatchAsync());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue