For optimized acknowledge, eagerly get acknowledgements from consumers

when the dispatched list gets too big.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@394050 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-14 08:38:50 +00:00
parent ef276909f1
commit 6eaea6f336
5 changed files with 184 additions and 29 deletions

View File

@ -46,6 +46,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.*;
/**
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@ -111,7 +112,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
private AtomicBoolean optimizeAcknowledge = new AtomicBoolean();
private boolean optimizeAcknowledge;
private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService = null;
/**
* Create a MessageConsumer
@ -160,6 +163,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.info = new ConsumerInfo(consumerId);
this.info.setSubcriptionName(name);
this.info.setPrefetchSize(prefetch);
this.info.setCurrentPrefetchSize(prefetch);
this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
this.info.setNoLocal(noLocal);
this.info.setDispatchAsync(dispatchAsync);
@ -182,9 +186,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
this.optimizeAcknowledge.set(session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
&&!info.isBrowser());
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge.get());
this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
&&!info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
@ -516,20 +520,35 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
void deliverAcks(){
synchronized(optimizeAcknowledge){
if(this.optimizeAcknowledge.get()){
MessageAck ack=null;
if(deliveryingAcknowledgements.compareAndSet(false,true)){
if(this.optimizeAcknowledge){
if(!deliveredMessages.isEmpty()){
MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
try{
session.asyncSendPacket(ack);
}catch(JMSException e){
log.error("Failed to delivered acknowledgements",e);
}
ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
deliveredMessages.clear();
ackCounter=0;
}
}
if(ack!=null){
final MessageAck ackToSend=ack;
if(executorService==null){
executorService=Executors.newSingleThreadExecutor();
}
executorService.submit(new Runnable(){
public void run(){
try{
session.asyncSendPacket(ackToSend);
}catch(JMSException e){
log.error("Failed to delivered acknowledgements",e);
}finally{
deliveryingAcknowledgements.set(false);
}
}
});
}else{
deliveryingAcknowledgements.set(false);
}
}
}
@ -539,6 +558,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// Ack any delivered messages now. (session may still
// commit/rollback the acks).
deliverAcks();//only processes optimized acknowledgements
if (executorService!=null){
executorService.shutdown();
}
if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
acknowledge();
}
@ -562,15 +584,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
protected void setOptimizeAcknowledge(boolean value){
synchronized(optimizeAcknowledge){
if (optimizeAcknowledge && !value){
deliverAcks();
optimizeAcknowledge.set(value);
}
optimizeAcknowledge=value;
}
protected void setPrefetchSize(int prefetch){
deliverAcks();
this.info.setPrefetchSize(prefetch);
this.info.setCurrentPrefetchSize(prefetch);
}
private void beforeMessageIsConsumed(MessageDispatch md) {
@ -590,20 +612,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else if(session.isAutoAcknowledge()){
if(!deliveredMessages.isEmpty()){
synchronized(optimizeAcknowledge){
if(this.optimizeAcknowledge.get()){
if(optimizeAcknowledge){
if(deliveryingAcknowledgements.compareAndSet(false,true)){
ackCounter++;
if(ackCounter>=(info.getPrefetchSize()*.75)){
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
if(ackCounter>=(info.getCurrentPrefetchSize()*.75)){
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
ackCounter=0;
deliveredMessages.clear();
}
}else{
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
deliveryingAcknowledgements.set(false);
}
}else{
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
}
}
}else if(session.isDupsOkAcknowledge()){
@ -697,12 +720,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void rollback() throws JMSException{
synchronized(unconsumedMessages.getMutex()){
synchronized(optimizeAcknowledge){
if(optimizeAcknowledge.get()){
// remove messages read but not acked at the broker yet through optimizeAcknowledge
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
deliveredMessages.removeLast();
}
if(optimizeAcknowledge){
// remove messages read but not acked at the broker yet through optimizeAcknowledge
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
deliveredMessages.removeLast();
}
}
if(deliveredMessages.isEmpty())

View File

@ -24,6 +24,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -62,6 +63,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(!isFull()&&!isSlaveBroker()){
dispatch(node);
}else{
optimizePrefetch();
synchronized(pending){
if( pending.isEmpty() )
if (log.isDebugEnabled()){
@ -210,6 +212,20 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark(){
return (dispatched.size()-prefetchExtension) <= (info.getPrefetchSize() *.4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark(){
return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
}
synchronized public int getPendingQueueSize(){
return pending.size();
}
@ -229,6 +245,26 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized public long getEnqueueCounter() {
return enqueueCounter;
}
/**
* optimize message consumer prefetch if the consumer supports it
*
*/
public void optimizePrefetch(){
if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
&&context.getConnection().isManageable()){
if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
info.setCurrentPrefetchSize(info.getPrefetchSize());
updateConsumerPrefetch(info.getPrefetchSize());
}else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
// want to purge any outstanding acks held by the consumer
info.setCurrentPrefetchSize(1);
updateConsumerPrefetch(1);
}
}
}
protected void dispatchMatched() throws IOException{
@ -289,6 +325,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
}
}
/**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch){
if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
ConsumerControl cc = new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(newPrefetch);
context.getConnection().dispatchAsync(cc);
}
}
/**
* @param node

View File

@ -148,4 +148,26 @@ public interface Subscription {
* Set when the subscription is registered in JMX
*/
public void setObjectName(ObjectName objectName);
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark();
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark();
/**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch);
/**
* optimize message consumer prefetch if the consumer supports it
*
*/
public void optimizePrefetch();
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -64,6 +65,7 @@ public class TopicSubscription extends AbstractSubscription{
enqueueCounter++;
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
@ -231,6 +233,51 @@ public class TopicSubscription extends AbstractSubscription{
private boolean isFull(){
return dispatched.get()-delivered.get()>=info.getPrefetchSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark(){
return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark(){
return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
}
/**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch){
if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
ConsumerControl cc = new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(newPrefetch);
context.getConnection().dispatchAsync(cc);
}
}
/**
* optimize message consumer prefetch if the consumer supports it
*
*/
public void optimizePrefetch(){
if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
&&context.getConnection().isManageable()){
if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
info.setCurrentPrefetchSize(info.getPrefetchSize());
updateConsumerPrefetch(info.getPrefetchSize());
}else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
// want to purge any outstanding acks held by the consumer
info.setCurrentPrefetchSize(1);
updateConsumerPrefetch(1);
}
}
}
private void dispatchMatched() throws IOException{
synchronized(matchedListMutex){

View File

@ -49,6 +49,7 @@ public class ConsumerInfo extends BaseCommand {
protected byte priority;
protected BrokerId[] brokerPath;
protected boolean optimizedAcknowledge;
protected transient int currentPrefetchSize;//used by the broker
protected BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; //this subscription originated from a network connection
@ -144,6 +145,7 @@ public class ConsumerInfo extends BaseCommand {
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
this.currentPrefetchSize = prefetchSize;
}
/**
@ -322,4 +324,18 @@ public class ConsumerInfo extends BaseCommand {
this.optimizedAcknowledge=optimizedAcknowledge;
}
/**
* @return Returns the currentPrefetchSize.
*/
public int getCurrentPrefetchSize(){
return currentPrefetchSize;
}
/**
* @param currentPrefetchSize The currentPrefetchSize to set.
*/
public void setCurrentPrefetchSize(int currentPrefetchSize){
this.currentPrefetchSize=currentPrefetchSize;
}
}