git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1213979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-12-13 23:13:35 +00:00
parent 5f7fc14e2e
commit 802f6b193a
1 changed files with 11 additions and 8 deletions

View File

@ -445,7 +445,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public Response processMessageAck(MessageAck ack) throws Exception {
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
broker.acknowledge(consumerExchange, ack);
if (consumerExchange != null) {
broker.acknowledge(consumerExchange, ack);
}
return null;
}
@ -529,6 +531,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
addConsumerBrokerExchange(info.getConsumerId());
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
@ -703,8 +706,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
TransportConnectionState cs = lookupConnectionState(id);
if (cs != null) {
// Don't allow things to be added to the connection state while we
// are
// shutting down.
// are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) {
@ -757,7 +759,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public void dispatchSync(Command message) {
// getStatistics().getEnqueues().increment();
try {
processDispatch(message);
} catch (IOException e) {
@ -767,7 +768,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void dispatchAsync(Command message) {
if (!stopping.get()) {
// getStatistics().getEnqueues().increment();
if (taskRunner == null) {
dispatchSync(message);
} else {
@ -809,13 +809,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
sub.run();
}
}
// getStatistics().getDequeues().increment();
}
}
public boolean iterate() {
try {
if (stopping.get()) {
if (pendingStop || stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
@ -931,7 +930,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
@ -1327,6 +1325,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
ConsumerBrokerExchange result = consumerExchanges.get(id);
return result;
}
private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
ConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) {
synchronized (consumerExchanges) {