go back to less granular synchronization

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@492461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-01-04 08:59:12 +00:00
parent 481fc1ee3a
commit 158dbc66e7
1 changed files with 276 additions and 319 deletions

View File

@ -1,27 +1,22 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@ -45,6 +40,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A subscription that honors the pre-fetch option of the ConsumerInfo.
*
@ -55,18 +51,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected PendingMessageCursor pending;
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
private AtomicBoolean dispatching = new AtomicBoolean();
private AtomicBoolean dispatching=new AtomicBoolean();
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor)
throws InvalidSelectorException{
super(broker,context,info);
pending = cursor;
pending=cursor;
}
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
@ -74,34 +68,32 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
this(broker,context,info,new VMPendingMessageCursor());
}
/**
* Allows a message to be pulled on demand by a client
*/
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
public synchronized Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception{
// The slave should not deliver pull messages. TODO: when the slave becomes a master,
// He should send a NULL message to all the consumers to 'wake them up' in case
// they were waiting for a message.
if (getPrefetchSize() == 0 && !isSlaveBroker()) {
if(getPrefetchSize()==0&&!isSlaveBroker()){
prefetchExtension++;
final long dispatchCounterBeforePull = dispatchCounter;
final long dispatchCounterBeforePull=dispatchCounter;
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if( dispatchCounterBeforePull == dispatchCounter ) {
if(dispatchCounterBeforePull==dispatchCounter){
// imediate timeout used by receiveNoWait()
if( pull.getTimeout() == -1 ) {
if(pull.getTimeout()==-1){
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched();
}
if( pull.getTimeout() > 0 ) {
if(pull.getTimeout()>0){
Scheduler.executeAfterDelay(new Runnable(){
public void run() {
public void run(){
pullTimeout(dispatchCounterBeforePull);
}
}, pull.getTimeout());
},pull.getTimeout());
}
}
}
@ -109,27 +101,25 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
/**
* Occurs when a pull times out. If nothing has been dispatched
* since the timeout was setup, then send the NULL message.
* Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL
* message.
*/
private void pullTimeout(long dispatchCounterBeforePull) {
if( dispatchCounterBeforePull == dispatchCounter ) {
try {
private void pullTimeout(long dispatchCounterBeforePull){
if(dispatchCounterBeforePull==dispatchCounter){
try{
add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched();
} catch (Exception e) {
}catch(Exception e){
context.getConnection().serviceException(e);
}
}
}
public void add(MessageReference node) throws Exception{
public synchronized void add(MessageReference node) throws Exception{
boolean pendingEmpty=false;
synchronized(pending){
pendingEmpty=pending.isEmpty();
enqueueCounter++;
}
if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
dispatch(node);
}else{
@ -140,14 +130,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
pending.addMessageLast(node);
}
//we might be able to dispatch messages (i.e. not full() anymore)
dispatchMatched();
}
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{
synchronized(pending){
public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{
try{
pending.reset();
while(pending.hasNext()){
@ -156,7 +142,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
pending.remove();
createMessageDispatch(node,node.getMessage());
dispatched.addLast(node);
return;
}
}
@ -166,12 +151,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()
+") was not in the pending list");
}
}
public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
public synchronized void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case.
boolean callDispatchMatched=false;
synchronized(dispatched){
if(ack.isStandardAck()){
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
@ -196,8 +179,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized(PrefetchSubscription.this){
dequeueCounter++;
dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getDequeues()
.increment();
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
prefetchExtension--;
}
}
@ -274,7 +256,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}
}
}
if(callDispatchMatched){
dispatchMatched();
}else{
@ -293,7 +274,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @throws IOException
* @throws Exception
*/
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{
// Send the message to the DLQ
Message message=node.getMessage();
if(message!=null){
@ -301,40 +282,41 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// sent,
// it is only populated if the message is routed to another destination like the DLQ
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
BrokerSupport.resend(context, message, deadLetterDestination);
ActiveMQDestination deadLetterDestination=deadLetterStrategy
.getDeadLetterQueueFor(message.getDestination());
BrokerSupport.resend(context,message,deadLetterDestination);
}
}
/**
* Used to determine if the broker can dispatch to the consumer.
*
* @return
*/
protected boolean isFull(){
return isSlaveBroker() || dispatched.size()-prefetchExtension>=info.getPrefetchSize();
protected synchronized boolean isFull(){
return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark(){
return (dispatched.size()-prefetchExtension) <= (info.getPrefetchSize() *.4);
return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark(){
return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
}
public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension - dispatched.size();
public synchronized int countBeforeFull(){
return info.getPrefetchSize()+prefetchExtension-dispatched.size();
}
public int getPendingQueueSize(){
synchronized(pending) {
synchronized(pending){
return pending.size();
}
}
@ -349,11 +331,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return dequeueCounter;
}
synchronized public long getDispatchedCounter() {
synchronized public long getDispatchedCounter(){
return dispatchCounter;
}
synchronized public long getEnqueueCounter() {
synchronized public long getEnqueueCounter(){
return enqueueCounter;
}
@ -367,40 +349,28 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
*/
public void optimizePrefetch(){
/*
if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
&&context.getConnection().isManageable()){
if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
info.setCurrentPrefetchSize(info.getPrefetchSize());
updateConsumerPrefetch(info.getPrefetchSize());
}else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
// want to purge any outstanding acks held by the consumer
info.setCurrentPrefetchSize(1);
updateConsumerPrefetch(1);
}
}
* if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
* &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
* isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
* updateConsumerPrefetch(info.getPrefetchSize()); }else
* if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any
* outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
*/
}
public void add(ConnectionContext context,Destination destination) throws Exception{
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
super.add(context,destination);
synchronized(pending){
pending.add(context,destination);
}
}
public void remove(ConnectionContext context,Destination destination) throws Exception{
public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
super.remove(context,destination);
synchronized(pending){
pending.remove(context,destination);
}
}
protected void dispatchMatched() throws IOException{
if(!broker.isSlaveBroker() && dispatching.compareAndSet(false,true)){
if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){
try{
List toDispatch=null;
synchronized(pending){
try{
int numberToDispatch=countBeforeFull();
if(numberToDispatch>0){
@ -408,9 +378,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
pending.reset();
while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
MessageReference node=pending.next();
if ( node == null )
if(node==null)
break;
if(canDispatch(node)){
pending.remove();
// Message may have been sitting in the pending list a while
@ -418,10 +387,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
continue; // just drop it.
}
if(toDispatch==null){
toDispatch=new ArrayList();
}
toDispatch.add(node);
dispatch(node);
count++;
}
}
@ -429,15 +395,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}finally{
pending.release();
}
}
if(toDispatch!=null){
synchronized(dispatched){
for(int i=0;i<toDispatch.size();i++){
MessageReference node=(MessageReference)toDispatch.get(i);
dispatch(node);
}
}
}
}finally{
dispatching.set(false);
}
@ -449,7 +406,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(message==null){
return false;
}
synchronized(dispatched){
// Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
MessageDispatch md=createMessageDispatch(node,message);
@ -474,17 +430,17 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
context.getConnection().dispatchSync(md);
onDispatch(node,message);
}
//System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter + ", " + dispatchCounter +") " + node);
return true;
}else{
QueueMessageReference n = (QueueMessageReference) node;
QueueMessageReference n=(QueueMessageReference)node;
return false;
}
}
}
protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){
if( node != QueueMessageReference.NULL_MESSAGE ) {
if(node!=QueueMessageReference.NULL_MESSAGE){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
context.getConnection().getStatistics().onMessageDequeue(message);
}
@ -498,11 +454,12 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
/**
* inform the MessageConsumer on the client to change it's prefetch
*
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch){
if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
ConsumerControl cc = new ConsumerControl();
if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
ConsumerControl cc=new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(newPrefetch);
context.getConnection().dispatchAsync(cc);
@ -515,13 +472,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @return MessageDispatch
*/
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
if( node == QueueMessageReference.NULL_MESSAGE ) {
if(node==QueueMessageReference.NULL_MESSAGE){
MessageDispatch md=new MessageDispatch();
md.setMessage(null);
md.setConsumerId(info.getConsumerId());
md.setDestination( null );
md.setDestination(null);
return md;
} else {
}else{
MessageDispatch md=new MessageDispatch();
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
@ -547,6 +504,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @throws IOException
*/
protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
throws IOException{}
throws IOException{
}
}