r236@34: chirino | 2007-02-23 14:48:10 -0500

Flag a ConnectionContext as being a network connection if it sends us a BrokerInfo.
 Disable flow control if the producer is on a network connection.. trying to get around a network deadlock.
 
 


git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511080 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-02-23 20:22:54 +00:00
parent 81ce0d1b4f
commit 579bc65ae3
5 changed files with 26 additions and 5 deletions

View File

@ -55,6 +55,7 @@ public class ConnectionContext {
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
private boolean dontSendReponse;
private boolean networkConnection;
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@ -255,4 +256,12 @@ public class ConnectionContext {
this.dontSendReponse = dontSendReponse;
}
public synchronized boolean isNetworkConnection() {
return networkConnection;
}
public synchronized void setNetworkConnection(boolean networkConnection) {
this.networkConnection = networkConnection;
}
}

View File

@ -125,6 +125,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private CountDownLatch stopLatch = new CountDownLatch(1);
protected final AtomicBoolean asyncException = new AtomicBoolean(false);
private ConnectionContext context;
private boolean networkConnection;
static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context;
@ -693,6 +694,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
context.setWireFormatInfo(wireFormatInfo);
context.setNetworkConnection(networkConnection);
context.incrementReference();
this.manageable = info.isManageable();
@ -1058,6 +1060,12 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
this.brokerInfo = info;
broker.addBroker(this, info);
networkConnection = true;
for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
ConnectionState cs = (ConnectionState) iter.next();
cs.getContext().setNetworkConnection(true);
}
return null;
}

View File

@ -280,7 +280,7 @@ public class Queue implements Destination {
public void send(final ConnectionContext context, final Message message) throws Exception {
if (context.isProducerFlowControl()) {
if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
if( message.isResponseRequired() ) {
if( usageManager.isFull() ) {
// System.out.println("Registering callback...");

View File

@ -232,7 +232,7 @@ public class Topic implements Destination {
public void send(final ConnectionContext context, final Message message) throws Exception {
if (context.isProducerFlowControl()) {
if (context.isProducerFlowControl() && !context.isNetworkConnection() ) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} else {

View File

@ -56,9 +56,13 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
}
public boolean matches(MessageEvaluationContext message) throws JMSException{
public boolean matches(MessageEvaluationContext mec) throws JMSException{
try{
return matchesForwardingFilter(message.getMessage());
//for Queues - the message can be acknowledged and dropped whilst still
//in the dispatch loop
//so need to get the reference to it
Message message = mec.getMessage();
return message != null && matchesForwardingFilter(message);
}catch(IOException e){
throw JMSExceptionSupport.create(e);
}
@ -132,4 +136,4 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
this.networkBrokerId = remoteBrokerPath;
}
}
}