From a59cd030f51dbffba541e1fe38dc329c01d214cf Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Sat, 17 Mar 2007 04:04:43 +0000 Subject: [PATCH] Implemented the per producer flow control on the Topic case too. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@519233 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 131 +++++++++--------- .../apache/activemq/broker/region/Topic.java | 104 ++++++++++++-- 2 files changed, 156 insertions(+), 79 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 3bbac238bb..d59b014071 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -352,73 +352,70 @@ public class Queue implements Destination, Task { } return; } - if ( context.isProducerFlowControl() ) { - if( usageManager.isFull() ) { - if(usageManager.isSendFailIfNoSpace()){ - throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); - }else{ - - // We can avoid blocking due to low usage if the producer is sending a sync message or - // if it is using a producer window - if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) { - synchronized( messagesWaitingForSpace ) { - messagesWaitingForSpace.add(new Runnable() { - public void run() { - - // While waiting for space to free up... the message may have expired. - if(message.isExpired()){ - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } - - if( !message.isResponseRequired() ) { - ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); - context.getConnection().dispatchAsync(ack); - } - return; - } - - - try { - doMessageSend(producerExchange, message); - } catch (Exception e) { - if( message.isResponseRequired() ) { - ExceptionResponse response = new ExceptionResponse(e); - response.setCorrelationId(message.getCommandId()); - context.getConnection().dispatchAsync(response); - } - } - } - }); - - // If the user manager is not full, then the task will not get called.. - if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { - // so call it directly here. - sendMessagesWaitingForSpaceTask.run(); - } - context.setDontSendReponse(true); - return; - } - - } else { - - // Producer flow control cannot be used, so we have do the flow control at the broker - // by blocking this thread until there is space available. - while( !usageManager.waitForSpace(1000) ) { - if( context.getStopping().get() ) - throw new IOException("Connection closed, send aborted."); - } - - // The usage manager could have delayed us by the time - // we unblock the message could have expired.. - if(message.isExpired()){ - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } - return; - } - } - } + if ( context.isProducerFlowControl() && usageManager.isFull() ) { + if(usageManager.isSendFailIfNoSpace()){ + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); + } + + // We can avoid blocking due to low usage if the producer is sending a sync message or + // if it is using a producer window + if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) { + synchronized( messagesWaitingForSpace ) { + messagesWaitingForSpace.add(new Runnable() { + public void run() { + + // While waiting for space to free up... the message may have expired. + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + + if( !message.isResponseRequired() ) { + ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); + context.getConnection().dispatchAsync(ack); + } + return; + } + + + try { + doMessageSend(producerExchange, message); + } catch (Exception e) { + if( message.isResponseRequired() ) { + ExceptionResponse response = new ExceptionResponse(e); + response.setCorrelationId(message.getCommandId()); + context.getConnection().dispatchAsync(response); + } + } + } + }); + + // If the user manager is not full, then the task will not get called.. + if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { + // so call it directly here. + sendMessagesWaitingForSpaceTask.run(); + } + context.setDontSendReponse(true); + return; + } + + } else { + + // Producer flow control cannot be used, so we have do the flow control at the broker + // by blocking this thread until there is space available. + while( !usageManager.waitForSpace(1000) ) { + if( context.getStopping().get() ) + throw new IOException("Connection closed, send aborted."); + } + + // The usage manager could have delayed us by the time + // we unblock the message could have expired.. + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + return; + } } } doMessageSend(producerExchange, message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index e3f7794582..c55665655c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.LinkedList; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -33,9 +34,11 @@ import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.memory.UsageManager; @@ -235,6 +238,22 @@ public class Topic implements Destination { } + private final LinkedList messagesWaitingForSpace = new LinkedList(); + private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { + public void run() { + + // We may need to do this in async thread since this is run for within a synchronization + // that the UsageManager is holding. + + synchronized( messagesWaitingForSpace ) { + while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) { + Runnable op = messagesWaitingForSpace.removeFirst(); + op.run(); + } + } + + }; + }; public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { final ConnectionContext context = producerExchange.getConnectionContext(); @@ -242,27 +261,89 @@ public class Topic implements Destination { // There is delay between the client sending it and it arriving at the // destination.. it may have expired. if( message.isExpired() ) { + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { + ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); + context.getConnection().dispatchAsync(ack); + } return; } - if (context.isProducerFlowControl() && !context.isNetworkConnection() ) { - if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { + + if ( context.isProducerFlowControl() && usageManager.isFull() ) { + if(usageManager.isSendFailIfNoSpace()){ throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); - } else { + } + + // We can avoid blocking due to low usage if the producer is sending a sync message or + // if it is using a producer window + if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) { + synchronized( messagesWaitingForSpace ) { + messagesWaitingForSpace.add(new Runnable() { + public void run() { + + // While waiting for space to free up... the message may have expired. + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + + if( !message.isResponseRequired() ) { + ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); + context.getConnection().dispatchAsync(ack); + } + return; + } + + + try { + doMessageSend(producerExchange, message); + } catch (Exception e) { + if( message.isResponseRequired() ) { + ExceptionResponse response = new ExceptionResponse(e); + response.setCorrelationId(message.getCommandId()); + context.getConnection().dispatchAsync(response); + } + } + } + }); + + // If the user manager is not full, then the task will not get called.. + if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { + // so call it directly here. + sendMessagesWaitingForSpaceTask.run(); + } + context.setDontSendReponse(true); + return; + } + + } else { + + // Producer flow control cannot be used, so we have do the flow control at the broker + // by blocking this thread until there is space available. while( !usageManager.waitForSpace(1000) ) { if( context.getStopping().get() ) throw new IOException("Connection closed, send aborted."); - } - usageManager.waitForSpace(); + } // The usage manager could have delayed us by the time // we unblock the message could have expired.. - if( message.isExpired() ) { - return; - } - } + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + return; + } + } } - message.setRegionDestination(this); + doMessageSend(producerExchange, message); + } + + private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + final ConnectionContext context = producerExchange.getConnectionContext(); + message.setRegionDestination(this); if (store != null && message.isPersistent() && !canOptimizeOutPersistence() ) store.addMessage(context, message); @@ -292,8 +373,7 @@ public class Topic implements Destination { finally { message.decrementReferenceCount(); } - - } + } private boolean canOptimizeOutPersistence() { return durableSubcribers.size()==0;