diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index b46672ac60..bfa511843c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -54,7 +54,7 @@ public class ConnectionContext { private boolean producerFlowControl=true; private MessageAuthorizationPolicy messageAuthorizationPolicy; private AtomicInteger referenceCounter = new AtomicInteger(); - + private boolean networkConnection; private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); public ConnectionContext() { @@ -246,4 +246,11 @@ public class ConnectionContext { return referenceCounter.decrementAndGet(); } + public synchronized boolean isNetworkConnection() { + return networkConnection; + } + + public synchronized void setNetworkConnection(boolean networkConnection) { + this.networkConnection = networkConnection; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index dc33a54cf5..dcfd2d3584 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -119,7 +119,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private final MapconsumerExchanges = new HashMap(); private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); protected AtomicBoolean dispatchStopped=new AtomicBoolean(false); - + private boolean networkConnection; + static class ConnectionState extends org.apache.activemq.state.ConnectionState{ private final ConnectionContext context; @@ -627,6 +628,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit context.setUserName(info.getUserName()); context.setConnectionId(info.getConnectionId()); context.setWireFormatInfo(wireFormatInfo); + context.setNetworkConnection(networkConnection); context.incrementReference(); this.manageable=info.isManageable(); state=new ConnectionState(info,context,this); @@ -1027,6 +1029,12 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } this.brokerInfo=info; broker.addBroker(this,info); + networkConnection = true; + for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) { + ConnectionState cs = (ConnectionState) iter.next(); + cs.getContext().setNetworkConnection(true); + } + return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index c25573a5de..1fa0db487f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -325,7 +325,7 @@ public class Queue implements Destination, Task { } return; } - if(context.isProducerFlowControl()){ + if (context.isProducerFlowControl() && !context.isNetworkConnection()) { if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){ throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); }else{ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index c49077d394..6cf90a33c8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -243,7 +243,7 @@ public class Topic implements Destination { if( message.isExpired() ) { return; } - if (context.isProducerFlowControl()) { + if (context.isProducerFlowControl() && !context.isNetworkConnection() ) { if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); } else {