Adding the bits need to do producer flow control with a window to the broker. Just implemented on the Queue case for now.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@516475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-09 17:29:30 +00:00
parent 8e7fa346f7
commit 4207bd9cc0
7 changed files with 261 additions and 68 deletions

View File

@ -17,6 +17,9 @@
*/
package org.apache.activemq;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@ -36,9 +39,6 @@ import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.IntrospectionSupport;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* A client uses a <CODE>MessageProducer</CODE> object to send messages to a
* destination. A <CODE>MessageProducer</CODE> object is created by passing a
@ -496,11 +496,7 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C
}
}
int size = this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
if( producerWindow!=null ) {
producerWindow.increaseUsage(size);
}
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow);
stats.onMessage();
}

View File

@ -17,25 +17,9 @@
*/
package org.apache.activemq;
import org.apache.activemq.command.*;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import java.io.Serializable;
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
@ -43,6 +27,68 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* <P>
* A <CODE>Session</CODE> object is a single-threaded context for producing
@ -1546,11 +1592,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* message priority.
* @param timeToLive -
* message expiration.
* @param producerWindow
* @throws JMSException
*/
protected int send(ActiveMQMessageProducer producer,
protected void send(ActiveMQMessageProducer producer,
ActiveMQDestination destination,Message message,int deliveryMode,
int priority,long timeToLive) throws JMSException{
int priority,long timeToLive, UsageManager producerWindow) throws JMSException{
checkClosed();
if(destination.isTemporary()&&connection.isDeleted(destination)){
throw new JMSException("Cannot publish to a deleted Destination: "
@ -1598,15 +1646,18 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){
this.connection.asyncSendPacket(msg);
if( producerWindow!=null ) {
// Since we defer lots of the marshaling till we hit the wire, this might not
// provide and accurate size. We may change over to doing more aggressive marshaling,
// to get more accurate sizes.. this is more important once users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
}else{
this.connection.syncSendPacket(msg);
}
// Since we defer lots of the marshaling till we hit the wire, this might not
// provide and accurate size. We may change over to doing more aggressive marshaling,
// to get more accurate sizes.. this is more important once users start using producer window
// flow control.
return msg.getSize();
}
}

View File

@ -58,6 +58,7 @@ public class ConnectionContext {
private boolean networkConnection;
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
private boolean dontSendReponse;
public ConnectionContext() {
}
@ -258,6 +259,14 @@ public class ConnectionContext {
public AtomicBoolean getStopping() {
return stopping;
}
public void setDontSendReponse(boolean b) {
this.dontSendReponse=b;
}
public boolean isDontSendReponse() {
return dontSendReponse;
}
}

View File

@ -124,6 +124,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
private ConnectionContext context;
private boolean networkConnection;
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
@ -284,6 +285,16 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
response.setCorrelationId(commandId);
}
// The context may have been flagged so that the response is not sent.
if( context!=null ) {
if( context.isDontSendReponse() ) {
context.setDontSendReponse(false);
response=null;
}
context=null;
}
return response;
}
@ -344,7 +355,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -365,7 +376,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -388,7 +399,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -399,7 +410,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -410,7 +421,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -421,7 +432,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -431,7 +442,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
context=null;
if(cs!=null){
context=cs.getContext();
}
@ -626,7 +637,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
log.debug("Setting up new connection: "+this);
// Setup the context.
String clientId=info.getClientId();
ConnectionContext context=new ConnectionContext();
context=new ConnectionContext();
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
@ -1096,7 +1107,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized(producerExchanges){
result=new ProducerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
ConnectionContext context=state.getContext();
context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){
@ -1125,7 +1136,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized(consumerExchanges){
result=new ConsumerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
ConnectionContext context=state.getContext();
context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){

View File

@ -42,9 +42,12 @@ import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
@ -78,7 +81,6 @@ public class Queue implements Destination, Task {
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
private PendingMessageCursor messages;
private final LinkedList pagedInMessages = new LinkedList();
private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
@ -95,7 +97,7 @@ public class Queue implements Destination, Task {
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
private boolean started = false;
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
@ -318,6 +320,23 @@ public class Queue implements Destination, Task {
}
}
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
// We may need to do this in async thread since this is run for within a synchronization
// that the UsageManager is holding.
synchronized( messagesWaitingForSpace ) {
while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
}
};
};
public void send(final ProducerBrokerExchange producerExchange,final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
@ -327,27 +346,88 @@ public class Queue implements Destination, Task {
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
while( !usageManager.waitForSpace(1000) ) {
if( context.getStopping().get() )
throw new IOException("Connection closed, send aborted.");
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if(message.isExpired()){
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
return;
}
}
if ( context.isProducerFlowControl() ) {
if( usageManager.isFull() ) {
if(usageManager.isSendFailIfNoSpace()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
// We can avoid blocking due to low usage if the producer is sending a sync message or
// if it is using a producer window
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) {
synchronized( messagesWaitingForSpace ) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
try {
doMessageSend(producerExchange, message);
if( message.isResponseRequired() ) {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
} else {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
} catch (Exception e) {
if( message.isResponseRequired() ) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
} else {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
}
}
});
// If the user manager is not full, then the task will not get called..
if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
context.setDontSendReponse(true);
return;
}
} else {
// Producer flow control cannot be used, so we have do the flow control at the broker
// by blocking this thread until there is space available.
while( !usageManager.waitForSpace(1000) ) {
if( context.getStopping().get() )
throw new IOException("Connection closed, send aborted.");
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if(message.isExpired()){
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
}
}
}
}
message.setRegionDestination(this);
doMessageSend(producerExchange, message);
}
private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
if(store!=null&&message.isPersistent()){
store.addMessage(context,message);
}
@ -361,12 +441,16 @@ public class Queue implements Destination, Task {
messages.addMessageLast(message);
}
// It could take while before we receive the commit
// operration.. by that time the message could have expired..
// op, by that time the message could have expired..
if(message.isExpired()){
// TODO: remove message from store.
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
sendMessage(context,message);
@ -379,9 +463,7 @@ public class Queue implements Destination, Task {
sendMessage(context,message);
}
}
}
public void dispose(ConnectionContext context) throws IOException {
if (store != null) {

View File

@ -38,6 +38,11 @@ public class ProducerAck extends BaseCommand {
public ProducerAck() {
}
public ProducerAck(ProducerId producerId, int size) {
this.producerId = producerId;
this.size = size;
}
public void copy(ProducerAck copy) {
super.copy(copy);
copy.producerId = producerId;

View File

@ -18,14 +18,14 @@
package org.apache.activemq.memory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Used to keep track of how much of something is being used so that
@ -60,6 +60,7 @@ public class UsageManager implements Service{
private String name = "";
private float usagePortion = 1.0f;
private List<UsageManager> children = new CopyOnWriteArrayList<UsageManager>();
private final LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
public UsageManager() {
this(null,"default");
@ -292,6 +293,11 @@ public class UsageManager implements Service{
if(oldPercentUsage>=100&&newPercentUsage<100){
synchronized(usageMutex){
usageMutex.notifyAll();
for (Iterator iter = callbacks.iterator(); iter.hasNext();) {
Runnable callback = (Runnable) iter.next();
callback.run();
}
callbacks.clear();
}
}
// Let the listeners know
@ -331,4 +337,37 @@ public class UsageManager implements Service{
private void removeChild(UsageManager child){
children.remove(child);
}
/**
* @param callback
* @return true if the UsageManager was full. The callback will only be called if this method returns true.
*/
public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
if(parent!=null) {
Runnable r = new Runnable(){
public void run() {
synchronized (usageMutex) {
if( percentUsage >= 100 ) {
callbacks.add(callback);
} else {
callback.run();
}
}
}
};
if( parent.notifyCallbackWhenNotFull(r) ) {
return true;
}
}
synchronized (usageMutex) {
if( percentUsage >= 100 ) {
callbacks.add(callback);
return true;
} else {
return false;
}
}
}
}