mirror of https://github.com/apache/activemq.git
ported fix to trunk :
http://issues.apache.org/activemq/browse/AMQ-1181 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@514720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e5b45f6b4b
commit
49ea0eddf6
|
@ -54,7 +54,7 @@ public class ConnectionContext {
|
|||
private boolean producerFlowControl=true;
|
||||
private MessageAuthorizationPolicy messageAuthorizationPolicy;
|
||||
private AtomicInteger referenceCounter = new AtomicInteger();
|
||||
|
||||
private boolean networkConnection;
|
||||
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
|
||||
|
||||
public ConnectionContext() {
|
||||
|
@ -246,4 +246,11 @@ public class ConnectionContext {
|
|||
return referenceCounter.decrementAndGet();
|
||||
}
|
||||
|
||||
public synchronized boolean isNetworkConnection() {
|
||||
return networkConnection;
|
||||
}
|
||||
|
||||
public synchronized void setNetworkConnection(boolean networkConnection) {
|
||||
this.networkConnection = networkConnection;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,6 +119,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
|
|||
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
|
||||
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
|
||||
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
|
||||
private boolean networkConnection;
|
||||
|
||||
static class ConnectionState extends org.apache.activemq.state.ConnectionState{
|
||||
|
||||
|
@ -627,6 +628,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
|
|||
context.setUserName(info.getUserName());
|
||||
context.setConnectionId(info.getConnectionId());
|
||||
context.setWireFormatInfo(wireFormatInfo);
|
||||
context.setNetworkConnection(networkConnection);
|
||||
context.incrementReference();
|
||||
this.manageable=info.isManageable();
|
||||
state=new ConnectionState(info,context,this);
|
||||
|
@ -1027,6 +1029,12 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
|
|||
}
|
||||
this.brokerInfo=info;
|
||||
broker.addBroker(this,info);
|
||||
networkConnection = true;
|
||||
for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
|
||||
ConnectionState cs = (ConnectionState) iter.next();
|
||||
cs.getContext().setNetworkConnection(true);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -325,7 +325,7 @@ public class Queue implements Destination, Task {
|
|||
}
|
||||
return;
|
||||
}
|
||||
if(context.isProducerFlowControl()){
|
||||
if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
|
||||
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
|
||||
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
|
||||
}else{
|
||||
|
|
|
@ -243,7 +243,7 @@ public class Topic implements Destination {
|
|||
if( message.isExpired() ) {
|
||||
return;
|
||||
}
|
||||
if (context.isProducerFlowControl()) {
|
||||
if (context.isProducerFlowControl() && !context.isNetworkConnection() ) {
|
||||
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
|
||||
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue