From 015cd8a7f92cb245977f1d502ffc401932426095 Mon Sep 17 00:00:00 2001 From: "Jonas B. Lim" Date: Mon, 5 Mar 2007 15:41:17 +0000 Subject: [PATCH] ported fix to trunk : http://issues.apache.org/activemq/browse/AMQ-1172 http://issues.apache.org/activemq/browse/AMQ-1174 http://issues.apache.org/activemq/browse/AMQ-1175 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@514694 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/Connection.java | 2 + .../activemq/broker/TransportConnection.java | 159 +++++++++++++----- .../activemq/broker/jmx/ConnectionView.java | 4 + .../broker/region/ConnectionStatistics.java | 14 +- .../broker/region/PrefetchSubscription.java | 1 - .../broker/region/TopicSubscription.java | 34 ++-- 6 files changed, 140 insertions(+), 74 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java b/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java index 25b2cdad7b..5c0ea4ff9a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java @@ -105,5 +105,7 @@ public interface Connection extends Service { public String getRemoteAddress(); public void serviceExceptionAsync(IOException e); + + public String getConnectionId(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 809a1f925d..dc33a54cf5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.Service; import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.region.ConnectionStatistics; @@ -97,7 +98,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit // Used to do async dispatch.. this should perhaps be pushed down into the transport layer.. protected final List dispatchQueue=Collections.synchronizedList(new LinkedList()); protected final TaskRunner taskRunner; - protected IOException transportException; + protected final AtomicReference transportException = new AtomicReference(); private boolean inServiceException=false; private ConnectionStatistics statistics=new ConnectionStatistics(); private boolean manageable; @@ -116,6 +117,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private final AtomicBoolean asyncException=new AtomicBoolean(false); private final MapproducerExchanges = new HashMap(); private final MapconsumerExchanges = new HashMap(); + private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); + protected AtomicBoolean dispatchStopped=new AtomicBoolean(false); static class ConnectionState extends org.apache.activemq.state.ConnectionState{ @@ -166,7 +169,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit Command command=(Command)o; Response response=service(command); if(response!=null){ - dispatch(response); + dispatchSync(response); } } @@ -186,7 +189,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public void serviceTransportException(IOException e){ if(!disposed.get()){ - transportException=e; + transportException.set(e); if(transportLog.isDebugEnabled()) transportLog.debug("Transport failed: "+e,e); ServiceSupport.dispose(this); @@ -683,47 +686,96 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } public void dispatchSync(Command message){ - processDispatch(message); - } - - public void dispatchAsync(Command message){ - if(taskRunner==null){ - dispatchSync(message); - }else{ - dispatchQueue.add(message); - try{ - taskRunner.wakeup(); - }catch(InterruptedException e){ - Thread.currentThread().interrupt(); - } + getStatistics().getEnqueues().increment(); + try { + processDispatch(message); + } catch (IOException e) { + serviceExceptionAsync(e); } } - protected void processDispatch(Command command){ - if(command.isMessageDispatch()){ - MessageDispatch md=(MessageDispatch)command; - Runnable sub=(Runnable)md.getConsumer(); - broker.processDispatch(md); - try{ - dispatch(command); - }finally{ + public void dispatchAsync(Command message){ + if( !disposed.get() ) { + getStatistics().getEnqueues().increment(); + if( taskRunner==null ) { + dispatchSync( message ); + } else { + dispatchQueue.add(message); + try { + taskRunner.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } else { + if(message.isMessageDispatch()) { + MessageDispatch md=(MessageDispatch) message; + Runnable sub=(Runnable) md.getConsumer(); + broker.processDispatch(md); + if(sub!=null){ + sub.run(); + } + } + } + } + + protected void processDispatch(Command command) throws IOException { + try { + if( !disposed.get() ) { + dispatch(command); + } + } finally { + + if(command.isMessageDispatch()){ + MessageDispatch md=(MessageDispatch) command; + Runnable sub=(Runnable) md.getConsumer(); + broker.processDispatch(md); if(sub!=null){ sub.run(); } } - }else{ - dispatch(command); + + getStatistics().getDequeues().increment(); } - } + } + + public boolean iterate(){ - if(dispatchQueue.isEmpty()||broker.isStopped()){ - return false; - }else{ - Command command=(Command)dispatchQueue.remove(0); - processDispatch(command); - return true; - } + try { + if( disposed.get() ) { + if( dispatchStopped.compareAndSet(false, true)) { + if( transportException.get()==null ) { + try { + dispatch(new ShutdownInfo()); + } catch (Throwable ignore) { + } + } + dispatchStoppedLatch.countDown(); + } + return false; + } + + if( !dispatchStopped.get() ) { + + if( dispatchQueue.isEmpty() ) { + return false; + } else { + Command command = (Command) dispatchQueue.remove(0); + processDispatch( command ); + return true; + } + } else { + return false; + } + + } catch (IOException e) { + if( dispatchStopped.compareAndSet(false, true)) { + dispatchStoppedLatch.countDown(); + } + serviceExceptionAsync(e); + return false; + } } /** @@ -792,11 +844,24 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit transport.stop(); active=false; if(disposed.compareAndSet(false,true)){ - if(taskRunner!=null) - taskRunner.shutdown(); - // Clear out the dispatch queue to release any memory that - // is being held on to. - dispatchQueue.clear(); + taskRunner.wakeup(); + dispatchStoppedLatch.await(); + + if( taskRunner!=null ) + taskRunner.shutdown(); + + // Run the MessageDispatch callbacks so that message references get cleaned up. + for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { + Command command = (Command) iter.next(); + if(command.isMessageDispatch()) { + MessageDispatch md=(MessageDispatch) command; + Runnable sub=(Runnable) md.getConsumer(); + broker.processDispatch(md); + if(sub!=null){ + sub.run(); + } + } + } // // Remove all logical connection associated with this connection // from the broker. @@ -965,13 +1030,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return null; } - protected void dispatch(Command command){ + protected void dispatch(Command command) throws IOException{ try{ setMarkedCandidate(true); transport.oneway(command); - getStatistics().onCommand(command); - }catch(IOException e){ - serviceExceptionAsync(e); }finally{ setMarkedCandidate(false); } @@ -981,6 +1043,17 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return transport.getRemoteAddress(); } + public String getConnectionId() { + Iterator iterator = localConnectionStates.values().iterator(); + ConnectionState object = (ConnectionState) iterator.next(); + if( object == null ) { + return null; + } + if( object.getInfo().getClientId() !=null ) + return object.getInfo().getClientId(); + return object.getInfo().getConnectionId().toString(); + } + private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){ ProducerBrokerExchange result=producerExchanges.get(id); if(result==null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java index b7998e2f94..99b2058bae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java @@ -102,4 +102,8 @@ public class ConnectionView implements ConnectionViewMBean { return connection.getRemoteAddress(); } + public String getConnectionId() { + return connection.getConnectionId(); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java index 54632e30e2..239ec6879d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java @@ -18,8 +18,7 @@ package org.apache.activemq.broker.region; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.Message; + import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.StatsImpl; @@ -73,16 +72,5 @@ public class ConnectionStatistics extends StatsImpl { } } - /** - * Updates the statistics as a command is dispatched into the connection - */ - public void onCommand(Command command) { - if (command.isMessageDispatch()) { - enqueues.increment(); - } - } - public void onMessageDequeue(Message message) { - dequeues.increment(); - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index e987cb76ef..758bcd512c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -453,7 +453,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ if(node.getRegionDestination()!=null){ if(node!=QueueMessageReference.NULL_MESSAGE){ node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); - context.getConnection().getStatistics().onMessageDequeue(message); } try{ dispatchMatched(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 17e88dab45..89216a6d68 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -44,8 +44,8 @@ public class TopicSubscription extends AbstractSubscription{ private static final AtomicLong cursorNameCounter=new AtomicLong(0); protected PendingMessageCursor matched; final protected UsageManager usageManager; - protected AtomicLong dispatched=new AtomicLong(); - protected AtomicLong delivered=new AtomicLong(); + protected AtomicLong dispatchedCounter=new AtomicLong(); + protected AtomicLong prefetchExtension=new AtomicLong(); private int maximumPendingMessages=-1; private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy(); private int discarded=0; @@ -136,7 +136,7 @@ public class TopicSubscription extends AbstractSubscription{ MessageReference node=matched.next(); if(node.isExpired()){ matched.remove(); - dispatched.incrementAndGet(); + dispatchedCounter.incrementAndGet(); node.decrementReferenceCount(); break; } @@ -154,7 +154,7 @@ public class TopicSubscription extends AbstractSubscription{ MessageReference node=matched.next(); if(node.getMessageId().equals(mdn.getMessageId())){ matched.remove(); - dispatched.incrementAndGet(); + dispatchedCounter.incrementAndGet(); node.decrementReferenceCount(); break; } @@ -170,7 +170,7 @@ public class TopicSubscription extends AbstractSubscription{ boolean wasFull=isFull(); if(ack.isStandardAck()||ack.isPoisonAck()){ if(context.isInTransaction()){ - delivered.addAndGet(ack.getMessageCount()); + prefetchExtension.addAndGet(ack.getMessageCount()); context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ @@ -180,8 +180,7 @@ public class TopicSubscription extends AbstractSubscription{ } } dequeueCounter.addAndGet(ack.getMessageCount()); - dispatched.addAndGet(-ack.getMessageCount()); - delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); + prefetchExtension.addAndGet(ack.getMessageCount()); } }); }else{ @@ -189,8 +188,7 @@ public class TopicSubscription extends AbstractSubscription{ destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); } dequeueCounter.addAndGet(ack.getMessageCount()); - dispatched.addAndGet(-ack.getMessageCount()); - delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); + prefetchExtension.addAndGet(ack.getMessageCount()); } if(wasFull&&!isFull()){ dispatchMatched(); @@ -198,7 +196,7 @@ public class TopicSubscription extends AbstractSubscription{ return; }else if(ack.isDeliveredAck()){ // Message was delivered but not acknowledged: update pre-fetch counters. - delivered.addAndGet(ack.getMessageCount()); + prefetchExtension.addAndGet(ack.getMessageCount()); if(wasFull&&!isFull()){ dispatchMatched(); } @@ -217,7 +215,7 @@ public class TopicSubscription extends AbstractSubscription{ } public int getDispatchedQueueSize(){ - return (int)(dispatched.get()-delivered.get()); + return (int)(dispatchedCounter.get()-dequeueCounter.get()); } public int getMaximumPendingMessages(){ @@ -225,7 +223,7 @@ public class TopicSubscription extends AbstractSubscription{ } public long getDispatchedCounter(){ - return dispatched.get(); + return dispatchedCounter.get(); } public long getEnqueueCounter(){ @@ -277,21 +275,21 @@ public class TopicSubscription extends AbstractSubscription{ // Implementation methods // ------------------------------------------------------------------------- private boolean isFull(){ - return dispatched.get()-delivered.get()>=info.getPrefetchSize(); + return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize(); } /** * @return true when 60% or more room is left for dispatching messages */ public boolean isLowWaterMark(){ - return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4); + return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize() *.4); } /** * @return true when 10% or less room is left for dispatching messages */ public boolean isHighWaterMark(){ - return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9); + return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize() *.9); } /** @@ -386,7 +384,7 @@ public class TopicSubscription extends AbstractSubscription{ md.setMessage(message); md.setConsumerId(info.getConsumerId()); md.setDestination(node.getRegionDestination().getActiveMQDestination()); - dispatched.incrementAndGet(); + dispatchedCounter.incrementAndGet(); // Keep track if this subscription is receiving messages from a single destination. if(singleDestination){ if(destination==null){ @@ -429,6 +427,8 @@ public class TopicSubscription extends AbstractSubscription{ } } - + public int getPrefetchSize() { + return (int) (info.getPrefetchSize() + prefetchExtension.get()); + } }