Implemented the per producer flow control on the Topic case too.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@519233 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-17 04:04:43 +00:00
parent a0e92d713b
commit a59cd030f5
2 changed files with 156 additions and 79 deletions

View File

@ -352,73 +352,70 @@ public class Queue implements Destination, Task {
} }
return; return;
} }
if ( context.isProducerFlowControl() ) { if ( context.isProducerFlowControl() && usageManager.isFull() ) {
if( usageManager.isFull() ) { if(usageManager.isSendFailIfNoSpace()){
if(usageManager.isSendFailIfNoSpace()){ throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); }
}else{
// We can avoid blocking due to low usage if the producer is sending a sync message or
// We can avoid blocking due to low usage if the producer is sending a sync message or // if it is using a producer window
// if it is using a producer window if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) {
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) { synchronized( messagesWaitingForSpace ) {
synchronized( messagesWaitingForSpace ) { messagesWaitingForSpace.add(new Runnable() {
messagesWaitingForSpace.add(new Runnable() { public void run() {
public void run() {
// While waiting for space to free up... the message may have expired.
// While waiting for space to free up... the message may have expired. if(message.isExpired()){
if(message.isExpired()){ if (log.isDebugEnabled()) {
if (log.isDebugEnabled()) { log.debug("Expired message: " + message);
log.debug("Expired message: " + message); }
}
if( !message.isResponseRequired() ) {
if( !message.isResponseRequired() ) { ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack);
context.getConnection().dispatchAsync(ack); }
} return;
return; }
}
try {
try { doMessageSend(producerExchange, message);
doMessageSend(producerExchange, message); } catch (Exception e) {
} catch (Exception e) { if( message.isResponseRequired() ) {
if( message.isResponseRequired() ) { ExceptionResponse response = new ExceptionResponse(e);
ExceptionResponse response = new ExceptionResponse(e); response.setCorrelationId(message.getCommandId());
response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response);
context.getConnection().dispatchAsync(response); }
} }
} }
} });
});
// If the user manager is not full, then the task will not get called..
// If the user manager is not full, then the task will not get called.. if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { // so call it directly here.
// so call it directly here. sendMessagesWaitingForSpaceTask.run();
sendMessagesWaitingForSpaceTask.run(); }
} context.setDontSendReponse(true);
context.setDontSendReponse(true); return;
return; }
}
} else {
} else {
// Producer flow control cannot be used, so we have do the flow control at the broker
// Producer flow control cannot be used, so we have do the flow control at the broker // by blocking this thread until there is space available.
// by blocking this thread until there is space available. while( !usageManager.waitForSpace(1000) ) {
while( !usageManager.waitForSpace(1000) ) { if( context.getStopping().get() )
if( context.getStopping().get() ) throw new IOException("Connection closed, send aborted.");
throw new IOException("Connection closed, send aborted."); }
}
// The usage manager could have delayed us by the time
// The usage manager could have delayed us by the time // we unblock the message could have expired..
// we unblock the message could have expired.. if(message.isExpired()){
if(message.isExpired()){ if (log.isDebugEnabled()) {
if (log.isDebugEnabled()) { log.debug("Expired message: " + message);
log.debug("Expired message: " + message); }
} return;
return; }
}
}
}
} }
} }
doMessageSend(producerExchange, message); doMessageSend(producerExchange, message);

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -33,9 +34,11 @@ import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
@ -235,6 +238,22 @@ public class Topic implements Destination {
} }
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
// We may need to do this in async thread since this is run for within a synchronization
// that the UsageManager is holding.
synchronized( messagesWaitingForSpace ) {
while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
}
};
};
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
@ -242,27 +261,89 @@ public class Topic implements Destination {
// There is delay between the client sending it and it arriving at the // There is delay between the client sending it and it arriving at the
// destination.. it may have expired. // destination.. it may have expired.
if( message.isExpired() ) { if( message.isExpired() ) {
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return; return;
} }
if (context.isProducerFlowControl() && !context.isNetworkConnection() ) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { if ( context.isProducerFlowControl() && usageManager.isFull() ) {
if(usageManager.isSendFailIfNoSpace()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} else { }
// We can avoid blocking due to low usage if the producer is sending a sync message or
// if it is using a producer window
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) {
synchronized( messagesWaitingForSpace ) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
// While waiting for space to free up... the message may have expired.
if(message.isExpired()){
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
try {
doMessageSend(producerExchange, message);
} catch (Exception e) {
if( message.isResponseRequired() ) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
}
}
});
// If the user manager is not full, then the task will not get called..
if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
context.setDontSendReponse(true);
return;
}
} else {
// Producer flow control cannot be used, so we have do the flow control at the broker
// by blocking this thread until there is space available.
while( !usageManager.waitForSpace(1000) ) { while( !usageManager.waitForSpace(1000) ) {
if( context.getStopping().get() ) if( context.getStopping().get() )
throw new IOException("Connection closed, send aborted."); throw new IOException("Connection closed, send aborted.");
} }
usageManager.waitForSpace();
// The usage manager could have delayed us by the time // The usage manager could have delayed us by the time
// we unblock the message could have expired.. // we unblock the message could have expired..
if( message.isExpired() ) { if(message.isExpired()){
return; if (log.isDebugEnabled()) {
} log.debug("Expired message: " + message);
} }
return;
}
}
} }
message.setRegionDestination(this); doMessageSend(producerExchange, message);
}
private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
if (store != null && message.isPersistent() && !canOptimizeOutPersistence() ) if (store != null && message.isPersistent() && !canOptimizeOutPersistence() )
store.addMessage(context, message); store.addMessage(context, message);
@ -292,8 +373,7 @@ public class Topic implements Destination {
finally { finally {
message.decrementReferenceCount(); message.decrementReferenceCount();
} }
}
}
private boolean canOptimizeOutPersistence() { private boolean canOptimizeOutPersistence() {
return durableSubcribers.size()==0; return durableSubcribers.size()==0;