diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index 10da8ad960..b59efd566a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -55,6 +55,7 @@ public class ConnectionContext { private MessageAuthorizationPolicy messageAuthorizationPolicy; private AtomicInteger referenceCounter = new AtomicInteger(); private boolean dontSendReponse; + private boolean networkConnection; private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); @@ -255,4 +256,12 @@ public class ConnectionContext { this.dontSendReponse = dontSendReponse; } + public synchronized boolean isNetworkConnection() { + return networkConnection; + } + + public synchronized void setNetworkConnection(boolean networkConnection) { + this.networkConnection = networkConnection; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index f926bb9de3..e490e3d23b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -125,6 +125,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi private CountDownLatch stopLatch = new CountDownLatch(1); protected final AtomicBoolean asyncException = new AtomicBoolean(false); private ConnectionContext context; + private boolean networkConnection; static class ConnectionState extends org.apache.activemq.state.ConnectionState { private final ConnectionContext context; @@ -693,6 +694,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi context.setUserName(info.getUserName()); context.setConnectionId(info.getConnectionId()); context.setWireFormatInfo(wireFormatInfo); + context.setNetworkConnection(networkConnection); context.incrementReference(); this.manageable = info.isManageable(); @@ -1058,6 +1060,12 @@ public class TransportConnection implements Service, Connection, Task, CommandVi this.brokerInfo = info; broker.addBroker(this, info); + networkConnection = true; + for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) { + ConnectionState cs = (ConnectionState) iter.next(); + cs.getContext().setNetworkConnection(true); + } + return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 422fcf712a..111a970039 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -280,7 +280,7 @@ public class Queue implements Destination { public void send(final ConnectionContext context, final Message message) throws Exception { - if (context.isProducerFlowControl()) { + if (context.isProducerFlowControl() && !context.isNetworkConnection()) { if( message.isResponseRequired() ) { if( usageManager.isFull() ) { // System.out.println("Registering callback..."); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index a06a5aaea1..e81d5d2223 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -232,7 +232,7 @@ public class Topic implements Destination { public void send(final ConnectionContext context, final Message message) throws Exception { - if (context.isProducerFlowControl()) { + if (context.isProducerFlowControl() && !context.isNetworkConnection() ) { if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); } else { diff --git a/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java b/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java index 7598d36ee6..60a6d8f693 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java @@ -56,9 +56,13 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { } - public boolean matches(MessageEvaluationContext message) throws JMSException{ + public boolean matches(MessageEvaluationContext mec) throws JMSException{ try{ - return matchesForwardingFilter(message.getMessage()); + //for Queues - the message can be acknowledged and dropped whilst still + //in the dispatch loop + //so need to get the reference to it + Message message = mec.getMessage(); + return message != null && matchesForwardingFilter(message); }catch(IOException e){ throw JMSExceptionSupport.create(e); } @@ -132,4 +136,4 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { this.networkBrokerId = remoteBrokerPath; } -} \ No newline at end of file +}