Jonas B. Lim 2007-03-05 15:41:17 +00:00
parent ef1d3559f5
commit 015cd8a7f9
6 changed files with 140 additions and 74 deletions

View File

@ -106,4 +106,6 @@ public interface Connection extends Service {
public void serviceExceptionAsync(IOException e); public void serviceExceptionAsync(IOException e);
public String getConnectionId();
} }

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.ConnectionStatistics;
@ -97,7 +98,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
// Used to do async dispatch.. this should perhaps be pushed down into the transport layer.. // Used to do async dispatch.. this should perhaps be pushed down into the transport layer..
protected final List dispatchQueue=Collections.synchronizedList(new LinkedList()); protected final List dispatchQueue=Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner; protected final TaskRunner taskRunner;
protected IOException transportException; protected final AtomicReference transportException = new AtomicReference();
private boolean inServiceException=false; private boolean inServiceException=false;
private ConnectionStatistics statistics=new ConnectionStatistics(); private ConnectionStatistics statistics=new ConnectionStatistics();
private boolean manageable; private boolean manageable;
@ -116,6 +117,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private final AtomicBoolean asyncException=new AtomicBoolean(false); private final AtomicBoolean asyncException=new AtomicBoolean(false);
private final Map<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap<ProducerId,ProducerBrokerExchange>(); private final Map<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap<ProducerId,ProducerBrokerExchange>();
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>(); private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
static class ConnectionState extends org.apache.activemq.state.ConnectionState{ static class ConnectionState extends org.apache.activemq.state.ConnectionState{
@ -166,7 +169,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
Command command=(Command)o; Command command=(Command)o;
Response response=service(command); Response response=service(command);
if(response!=null){ if(response!=null){
dispatch(response); dispatchSync(response);
} }
} }
@ -186,7 +189,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
public void serviceTransportException(IOException e){ public void serviceTransportException(IOException e){
if(!disposed.get()){ if(!disposed.get()){
transportException=e; transportException.set(e);
if(transportLog.isDebugEnabled()) if(transportLog.isDebugEnabled())
transportLog.debug("Transport failed: "+e,e); transportLog.debug("Transport failed: "+e,e);
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
@ -683,10 +686,17 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
} }
public void dispatchSync(Command message){ public void dispatchSync(Command message){
getStatistics().getEnqueues().increment();
try {
processDispatch(message); processDispatch(message);
} catch (IOException e) {
serviceExceptionAsync(e);
}
} }
public void dispatchAsync(Command message){ public void dispatchAsync(Command message){
if( !disposed.get() ) {
getStatistics().getEnqueues().increment();
if( taskRunner==null ) { if( taskRunner==null ) {
dispatchSync( message ); dispatchSync( message );
} else { } else {
@ -697,33 +707,75 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
} } else {
if(message.isMessageDispatch()) {
protected void processDispatch(Command command){ MessageDispatch md=(MessageDispatch) message;
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch)command;
Runnable sub=(Runnable) md.getConsumer(); Runnable sub=(Runnable) md.getConsumer();
broker.processDispatch(md); broker.processDispatch(md);
try{
dispatch(command);
}finally{
if(sub!=null){ if(sub!=null){
sub.run(); sub.run();
} }
} }
}else{
dispatch(command);
} }
} }
protected void processDispatch(Command command) throws IOException {
try {
if( !disposed.get() ) {
dispatch(command);
}
} finally {
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
Runnable sub=(Runnable) md.getConsumer();
broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
getStatistics().getDequeues().increment();
}
}
public boolean iterate(){ public boolean iterate(){
if(dispatchQueue.isEmpty()||broker.isStopped()){ try {
if( disposed.get() ) {
if( dispatchStopped.compareAndSet(false, true)) {
if( transportException.get()==null ) {
try {
dispatch(new ShutdownInfo());
} catch (Throwable ignore) {
}
}
dispatchStoppedLatch.countDown();
}
return false;
}
if( !dispatchStopped.get() ) {
if( dispatchQueue.isEmpty() ) {
return false; return false;
} else { } else {
Command command = (Command) dispatchQueue.remove(0); Command command = (Command) dispatchQueue.remove(0);
processDispatch( command ); processDispatch( command );
return true; return true;
} }
} else {
return false;
}
} catch (IOException e) {
if( dispatchStopped.compareAndSet(false, true)) {
dispatchStoppedLatch.countDown();
}
serviceExceptionAsync(e);
return false;
}
} }
/** /**
@ -792,11 +844,24 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
transport.stop(); transport.stop();
active=false; active=false;
if(disposed.compareAndSet(false,true)){ if(disposed.compareAndSet(false,true)){
taskRunner.wakeup();
dispatchStoppedLatch.await();
if( taskRunner!=null ) if( taskRunner!=null )
taskRunner.shutdown(); taskRunner.shutdown();
// Clear out the dispatch queue to release any memory that
// is being held on to. // Run the MessageDispatch callbacks so that message references get cleaned up.
dispatchQueue.clear(); for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = (Command) iter.next();
if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command;
Runnable sub=(Runnable) md.getConsumer();
broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
}
// //
// Remove all logical connection associated with this connection // Remove all logical connection associated with this connection
// from the broker. // from the broker.
@ -965,13 +1030,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return null; return null;
} }
protected void dispatch(Command command){ protected void dispatch(Command command) throws IOException{
try{ try{
setMarkedCandidate(true); setMarkedCandidate(true);
transport.oneway(command); transport.oneway(command);
getStatistics().onCommand(command);
}catch(IOException e){
serviceExceptionAsync(e);
}finally{ }finally{
setMarkedCandidate(false); setMarkedCandidate(false);
} }
@ -981,6 +1043,17 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return transport.getRemoteAddress(); return transport.getRemoteAddress();
} }
public String getConnectionId() {
Iterator iterator = localConnectionStates.values().iterator();
ConnectionState object = (ConnectionState) iterator.next();
if( object == null ) {
return null;
}
if( object.getInfo().getClientId() !=null )
return object.getInfo().getClientId();
return object.getInfo().getConnectionId().toString();
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){ private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
ProducerBrokerExchange result=producerExchanges.get(id); ProducerBrokerExchange result=producerExchanges.get(id);
if(result==null){ if(result==null){

View File

@ -102,4 +102,8 @@ public class ConnectionView implements ConnectionViewMBean {
return connection.getRemoteAddress(); return connection.getRemoteAddress();
} }
public String getConnectionId() {
return connection.getConnectionId();
}
} }

View File

@ -18,8 +18,7 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Message;
import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
@ -73,16 +72,5 @@ public class ConnectionStatistics extends StatsImpl {
} }
} }
/**
* Updates the statistics as a command is dispatched into the connection
*/
public void onCommand(Command command) {
if (command.isMessageDispatch()) {
enqueues.increment();
}
}
public void onMessageDequeue(Message message) {
dequeues.increment();
}
} }

View File

@ -453,7 +453,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
if(node!=QueueMessageReference.NULL_MESSAGE){ if(node!=QueueMessageReference.NULL_MESSAGE){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
context.getConnection().getStatistics().onMessageDequeue(message);
} }
try{ try{
dispatchMatched(); dispatchMatched();

View File

@ -44,8 +44,8 @@ public class TopicSubscription extends AbstractSubscription{
private static final AtomicLong cursorNameCounter=new AtomicLong(0); private static final AtomicLong cursorNameCounter=new AtomicLong(0);
protected PendingMessageCursor matched; protected PendingMessageCursor matched;
final protected UsageManager usageManager; final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong(); protected AtomicLong dispatchedCounter=new AtomicLong();
protected AtomicLong delivered=new AtomicLong(); protected AtomicLong prefetchExtension=new AtomicLong();
private int maximumPendingMessages=-1; private int maximumPendingMessages=-1;
private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy(); private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy();
private int discarded=0; private int discarded=0;
@ -136,7 +136,7 @@ public class TopicSubscription extends AbstractSubscription{
MessageReference node=matched.next(); MessageReference node=matched.next();
if(node.isExpired()){ if(node.isExpired()){
matched.remove(); matched.remove();
dispatched.incrementAndGet(); dispatchedCounter.incrementAndGet();
node.decrementReferenceCount(); node.decrementReferenceCount();
break; break;
} }
@ -154,7 +154,7 @@ public class TopicSubscription extends AbstractSubscription{
MessageReference node=matched.next(); MessageReference node=matched.next();
if(node.getMessageId().equals(mdn.getMessageId())){ if(node.getMessageId().equals(mdn.getMessageId())){
matched.remove(); matched.remove();
dispatched.incrementAndGet(); dispatchedCounter.incrementAndGet();
node.decrementReferenceCount(); node.decrementReferenceCount();
break; break;
} }
@ -170,7 +170,7 @@ public class TopicSubscription extends AbstractSubscription{
boolean wasFull=isFull(); boolean wasFull=isFull();
if(ack.isStandardAck()||ack.isPoisonAck()){ if(ack.isStandardAck()||ack.isPoisonAck()){
if(context.isInTransaction()){ if(context.isInTransaction()){
delivered.addAndGet(ack.getMessageCount()); prefetchExtension.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization(){ context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{ public void afterCommit() throws Exception{
@ -180,8 +180,7 @@ public class TopicSubscription extends AbstractSubscription{
} }
} }
dequeueCounter.addAndGet(ack.getMessageCount()); dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount()); prefetchExtension.addAndGet(ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
} }
}); });
}else{ }else{
@ -189,8 +188,7 @@ public class TopicSubscription extends AbstractSubscription{
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
} }
dequeueCounter.addAndGet(ack.getMessageCount()); dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount()); prefetchExtension.addAndGet(ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
} }
if(wasFull&&!isFull()){ if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
@ -198,7 +196,7 @@ public class TopicSubscription extends AbstractSubscription{
return; return;
}else if(ack.isDeliveredAck()){ }else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch counters. // Message was delivered but not acknowledged: update pre-fetch counters.
delivered.addAndGet(ack.getMessageCount()); prefetchExtension.addAndGet(ack.getMessageCount());
if(wasFull&&!isFull()){ if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
} }
@ -217,7 +215,7 @@ public class TopicSubscription extends AbstractSubscription{
} }
public int getDispatchedQueueSize(){ public int getDispatchedQueueSize(){
return (int)(dispatched.get()-delivered.get()); return (int)(dispatchedCounter.get()-dequeueCounter.get());
} }
public int getMaximumPendingMessages(){ public int getMaximumPendingMessages(){
@ -225,7 +223,7 @@ public class TopicSubscription extends AbstractSubscription{
} }
public long getDispatchedCounter(){ public long getDispatchedCounter(){
return dispatched.get(); return dispatchedCounter.get();
} }
public long getEnqueueCounter(){ public long getEnqueueCounter(){
@ -277,21 +275,21 @@ public class TopicSubscription extends AbstractSubscription{
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
private boolean isFull(){ private boolean isFull(){
return dispatched.get()-delivered.get()>=info.getPrefetchSize(); return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize();
} }
/** /**
* @return true when 60% or more room is left for dispatching messages * @return true when 60% or more room is left for dispatching messages
*/ */
public boolean isLowWaterMark(){ public boolean isLowWaterMark(){
return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4); return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize() *.4);
} }
/** /**
* @return true when 10% or less room is left for dispatching messages * @return true when 10% or less room is left for dispatching messages
*/ */
public boolean isHighWaterMark(){ public boolean isHighWaterMark(){
return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9); return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize() *.9);
} }
/** /**
@ -386,7 +384,7 @@ public class TopicSubscription extends AbstractSubscription{
md.setMessage(message); md.setMessage(message);
md.setConsumerId(info.getConsumerId()); md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination()); md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatched.incrementAndGet(); dispatchedCounter.incrementAndGet();
// Keep track if this subscription is receiving messages from a single destination. // Keep track if this subscription is receiving messages from a single destination.
if(singleDestination){ if(singleDestination){
if(destination==null){ if(destination==null){
@ -429,6 +427,8 @@ public class TopicSubscription extends AbstractSubscription{
} }
} }
public int getPrefetchSize() {
return (int) (info.getPrefetchSize() + prefetchExtension.get());
}
} }