diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index baa755d563..ef2e33fc38 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -270,25 +270,27 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { - advisoryMessage.setDataStructure(command); - advisoryMessage.setPersistent(false); - advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); - advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); - advisoryMessage.setTargetConsumerId(targetConsumerId); - - advisoryMessage.setDestination(topic); - advisoryMessage.setResponseRequired(false); - advisoryMessage.setProducerId(advisoryProducerId); - boolean originalFlowControl = context.isProducerFlowControl(); - final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); - producerExchange.setConnectionContext(context); - producerExchange.setMutable(true); - try { - context.setProducerFlowControl(false); - next.send(producerExchange, advisoryMessage); - } finally { - context.setProducerFlowControl(originalFlowControl); + protected void fireAdvisory(ConnectionContext context,ActiveMQTopic topic,Command command, + ConsumerId targetConsumerId,ActiveMQMessage advisoryMessage) throws Exception{ + if(getBrokerService().isStarted()){ + advisoryMessage.setDataStructure(command); + advisoryMessage.setPersistent(false); + advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); + advisoryMessage.setMessageId(new MessageId(advisoryProducerId,messageIdGenerator.getNextSequenceId())); + advisoryMessage.setTargetConsumerId(targetConsumerId); + advisoryMessage.setDestination(topic); + advisoryMessage.setResponseRequired(false); + advisoryMessage.setProducerId(advisoryProducerId); + boolean originalFlowControl=context.isProducerFlowControl(); + final ProducerBrokerExchange producerExchange=new ProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); + try{ + context.setProducerFlowControl(false); + next.send(producerExchange,advisoryMessage); + }finally{ + context.setProducerFlowControl(originalFlowControl); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index bdb873d257..961e776b06 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -295,35 +295,35 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public Response service(Command command){ Response response=null; - boolean responseRequired=command.isResponseRequired(); - int commandId=command.getCommandId(); - try{ - response=command.visit(this); - }catch(Throwable e){ + if(broker.getBrokerService().isStarted()){ + boolean responseRequired=command.isResponseRequired(); + int commandId=command.getCommandId(); + try{ + response=command.visit(this); + }catch(Throwable e){ + if(responseRequired){ + if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class) + serviceLog.debug("Error occured while processing sync command: "+e,e); + response=new ExceptionResponse(e); + }else{ + serviceException(e); + } + } if(responseRequired){ - if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class) - serviceLog.debug("Error occured while processing sync command: "+e,e); - response=new ExceptionResponse(e); - }else{ - serviceException(e); + if(response==null){ + response=new Response(); + } + response.setCorrelationId(commandId); + } + // The context may have been flagged so that the response is not sent. + if(context!=null){ + if(context.isDontSendReponse()){ + context.setDontSendReponse(false); + response=null; + } + context=null; } } - if(responseRequired){ - if(response==null){ - response=new Response(); - } - response.setCorrelationId(commandId); - } - - // The context may have been flagged so that the response is not sent. - if( context!=null ) { - if( context.isDontSendReponse() ) { - context.setDontSendReponse(false); - response=null; - } - context=null; - } - return response; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 45a6f8f50e..14d67d7ece 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -107,6 +107,19 @@ public class DestinationView implements DestinationViewMBean { public void setMemoryLimit(long limit) { destination.getUsageManager().setLimit(limit); } + + public double getAverageEnqueueTime(){ + return destination.getDestinationStatistics().getProcessTime().getAverageTime(); + } + + public long getMaxEnqueueTime(){ + return destination.getDestinationStatistics().getProcessTime().getMaxTime(); + } + + public long getMinEnqueueTime(){ + return destination.getDestinationStatistics().getProcessTime().getMinTime(); + } + public CompositeData[] browse() throws OpenDataException{ try { @@ -260,5 +273,4 @@ public class DestinationView implements DestinationViewMBean { } } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 906bd55a0d..7efa0912f1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -125,5 +125,21 @@ public interface DestinationViewMBean { * Browses the current destination with the given selector returning a list of messages */ public List browseMessages(String selector) throws InvalidSelectorException; + + + /** + * @return longest time a message is held by a destination + */ + public long getMaxEnqueueTime(); + + /** + * @return shortest time a message is held by a destination + */ + public long getMinEnqueueTime(); + + /** + * @return average time a message is held by a destination + */ + public double getAverageEnqueueTime(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 4db5a1a0a8..3d8a6f386a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -22,6 +22,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.PollCountStatisticImpl; import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.management.TimeStatisticImpl; /** * The J2EE Statistics for the a Destination. @@ -36,6 +37,7 @@ public class DestinationStatistics extends StatsImpl { protected CountStatisticImpl messages; protected PollCountStatisticImpl messagesCached; protected CountStatisticImpl dispatched; + protected TimeStatisticImpl processTime; public DestinationStatistics() { @@ -45,13 +47,14 @@ public class DestinationStatistics extends StatsImpl { consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination"); messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination"); messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache"); - + processTime = new TimeStatisticImpl("processTime","information around length of time messages are held by a destination"); addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); addStatistic("consumers", consumers); addStatistic("messages", messages); addStatistic("messagesCached", messagesCached); + addStatistic("processTime",processTime); } public CountStatisticImpl getEnqueues() { @@ -73,6 +76,18 @@ public class DestinationStatistics extends StatsImpl { public CountStatisticImpl getMessages() { return messages; } + + public void setMessagesCached(PollCountStatisticImpl messagesCached) { + this.messagesCached = messagesCached; + } + + public CountStatisticImpl getDispatched() { + return dispatched; + } + + public TimeStatisticImpl getProcessTime(){ + return this.processTime; + } public void reset() { super.reset(); @@ -89,6 +104,7 @@ public class DestinationStatistics extends StatsImpl { consumers.setEnabled(enabled); messages.setEnabled(enabled); messagesCached.setEnabled(enabled); + processTime.setEnabled(enabled); } @@ -100,6 +116,7 @@ public class DestinationStatistics extends StatsImpl { consumers.setParent(parent.consumers); messagesCached.setParent(parent.messagesCached); messages.setParent(parent.messages); + processTime.setParent(parent.processTime); } else { enqueues.setParent(null); @@ -108,14 +125,9 @@ public class DestinationStatistics extends StatsImpl { consumers.setParent(null); messagesCached.setParent(null); messages.setParent(null); + processTime.setParent(null); } } - public void setMessagesCached(PollCountStatisticImpl messagesCached) { - this.messagesCached = messagesCached; - } - - public CountStatisticImpl getDispatched() { - return dispatched; - } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index ec53811c66..419bfa6367 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -539,10 +539,15 @@ public class RegionBroker implements Broker { return result; } - public void preProcessDispatch(MessageDispatch messageDispatch){ - Message message = messageDispatch.getMessage(); - if(message != null) { - message.setBrokerOutTime(System.currentTimeMillis()); + public void preProcessDispatch(MessageDispatch messageDispatch){ + Message message=messageDispatch.getMessage(); + if(message!=null){ + long endTime=System.currentTimeMillis(); + message.setBrokerOutTime(endTime); + if(getBrokerService().isEnableStatistics()){ + long totalTime = endTime - message.getBrokerInTime(); + message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java b/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java index eba42b5783..82c89cbb11 100755 --- a/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java +++ b/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java @@ -137,6 +137,16 @@ public class PropertyExpression implements Expression { return new Integer(txId.toString()); } }); + JMS_PROPERTY_EXPRESSIONS.put("JMS_ActiveMQBrokerInTime", new SubExpression() { + public Object evaluate(Message message) { + return Long.valueOf(message.getBrokerInTime()); + } + }); + JMS_PROPERTY_EXPRESSIONS.put("JMS_ActiveMQBrokerOutTime", new SubExpression() { + public Object evaluate(Message message) { + return Long.valueOf(message.getBrokerOutTime()); + } + }); } private final String name; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java index b92b730cfd..362a205bc8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java @@ -153,8 +153,11 @@ public class StubBroker implements Broker { public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { return 0; } + + public void preProcessDispatch(MessageDispatch messageDispatch) { + } - public void processDispatch(MessageDispatch messageDispatch) { + public void postProcessDispatch(MessageDispatch messageDispatch) { } public void removeBroker(Connection connection, BrokerInfo info) { @@ -245,6 +248,10 @@ public class StubBroker implements Broker { public BrokerService getBrokerService(){ return null; } + + public boolean isExpired(MessageReference messageReference) { + return false; + } public void messageExpired(ConnectionContext context,MessageReference messageReference){ }