diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java index b1ec045a15..d8ed3c9f64 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java @@ -142,6 +142,7 @@ public class ActiveMQConnectionMetaData implements ConnectionMetaData { jmxProperties.put("JMSXGroupID", "1"); jmxProperties.put("JMSXGroupSeq", "1"); jmxProperties.put("JMSXDeliveryCount","1"); + jmxProperties.put("JMSXProducerTXID","1"); return jmxProperties.keys(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 6a93c4e9ad..25c3bcf844 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -896,8 +896,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC ActiveMQMessage message=createActiveMQMessage(md); beforeMessageIsConsumed(md); try{ - listener.onMessage(message); - afterMessageIsConsumed(md,false); + boolean expired=message.isExpired(); + if(!expired){ + listener.onMessage(message); + } + afterMessageIsConsumed(md,expired); }catch(RuntimeException e){ if(session.isDupsOkAcknowledge()||session.isAutoAcknowledge()){ // Redeliver the message diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 91d98a4aed..f779dd4fe9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -186,12 +186,17 @@ public interface Broker extends Region, Service { */ BrokerInfo[] getPeerBrokerInfos(); - + /** + * Notify the Broker that a dispatch is going to happen + * @param messageDispatch + */ + public void preProcessDispatch(MessageDispatch messageDispatch); + /** * Notify the Broker that a dispatch has happened * @param messageDispatch */ - public void processDispatch(MessageDispatch messageDispatch); + public void postProcessDispatch(MessageDispatch messageDispatch); /** * @return true if the broker has stopped @@ -263,11 +268,18 @@ public interface Broker extends Region, Service { */ Broker getRoot(); + /** + * Determine if a message has expired -allows default behaviour to be overriden - + * as the timestamp set by the producer can be out of sync with the broker + * @param messageReference + * @return true if the message is expired + */ + public boolean isExpired(MessageReference messageReference); + /** * A Message has Expired * @param context * @param messageReference - * @throws Exception */ public void messageExpired(ConnectionContext context, MessageReference messageReference); @@ -275,7 +287,8 @@ public interface Broker extends Region, Service { * A message needs to go the a DLQ * @param context * @param messageReference - * @throws Exception */ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference); + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 08fe93d754..3a77813297 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -51,189 +51,190 @@ public class BrokerFilter implements Broker { final protected Broker next; - public BrokerFilter(Broker next) { + public BrokerFilter(Broker next){ this.next=next; } - + public Broker getAdaptor(Class type){ - if (type.isInstance(this)){ + if(type.isInstance(this)){ return this; } return next.getAdaptor(type); } - public Map getDestinationMap() { + public Map getDestinationMap(){ return next.getDestinationMap(); } - public Set getDestinations(ActiveMQDestination destination) { + public Set getDestinations(ActiveMQDestination destination){ return next.getDestinations(destination); } - public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { - next.acknowledge(consumerExchange, ack); + public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{ + next.acknowledge(consumerExchange,ack); } - public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { - return next.messagePull(context, pull); + public Response messagePull(ConnectionContext context,MessagePull pull) throws Exception{ + return next.messagePull(context,pull); } - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - next.addConnection(context, info); + public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception{ + next.addConnection(context,info); } - public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - return next.addConsumer(context, info); + public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{ + return next.addConsumer(context,info); } - public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - next.addProducer(context, info); + public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception{ + next.addProducer(context,info); } - public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { - next.commitTransaction(context, xid, onePhase); + public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception{ + next.commitTransaction(context,xid,onePhase); } - public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { - next.removeSubscription(context, info); + public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception{ + next.removeSubscription(context,info); } - public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { + public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception{ return next.getPreparedTransactions(context); } - public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { - return next.prepareTransaction(context, xid); + public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception{ + return next.prepareTransaction(context,xid); } - public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - next.removeConnection(context, info, error); + public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Exception{ + next.removeConnection(context,info,error); } - public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - next.removeConsumer(context, info); + public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{ + next.removeConsumer(context,info); } - public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { - next.removeProducer(context, info); + public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception{ + next.removeProducer(context,info); } - public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { - next.rollbackTransaction(context, xid); + public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception{ + next.rollbackTransaction(context,xid); } - public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - next.send(producerExchange, messageSend); + public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception{ + next.send(producerExchange,messageSend); } - public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { - next.beginTransaction(context, xid); + public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{ + next.beginTransaction(context,xid); } - public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { - next.forgetTransaction(context, transactionId); + public void forgetTransaction(ConnectionContext context,TransactionId transactionId) throws Exception{ + next.forgetTransaction(context,transactionId); } - public Connection[] getClients() throws Exception { + public Connection[] getClients() throws Exception{ return next.getClients(); } - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - return next.addDestination(context, destination); + public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Exception{ + return next.addDestination(context,destination); } - public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { - next.removeDestination(context, destination, timeout); + public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) + throws Exception{ + next.removeDestination(context,destination,timeout); } - public ActiveMQDestination[] getDestinations() throws Exception { + public ActiveMQDestination[] getDestinations() throws Exception{ return next.getDestinations(); } - public void start() throws Exception { + public void start() throws Exception{ next.start(); } - public void stop() throws Exception { + public void stop() throws Exception{ next.stop(); } - public void addSession(ConnectionContext context, SessionInfo info) throws Exception { - next.addSession(context, info); + public void addSession(ConnectionContext context,SessionInfo info) throws Exception{ + next.addSession(context,info); } - public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { - next.removeSession(context, info); + public void removeSession(ConnectionContext context,SessionInfo info) throws Exception{ + next.removeSession(context,info); } - public BrokerId getBrokerId() { + public BrokerId getBrokerId(){ return next.getBrokerId(); } - public String getBrokerName() { + public String getBrokerName(){ return next.getBrokerName(); } - - public void gc() { + + public void gc(){ next.gc(); } - public void addBroker(Connection connection,BrokerInfo info){ - next.addBroker(connection, info); - } - - public void removeBroker(Connection connection,BrokerInfo info){ - next.removeBroker(connection, info); + next.addBroker(connection,info); } + public void removeBroker(Connection connection,BrokerInfo info){ + next.removeBroker(connection,info); + } public BrokerInfo[] getPeerBrokerInfos(){ return next.getPeerBrokerInfos(); } - - public void processDispatch(MessageDispatch messageDispatch){ - next.processDispatch(messageDispatch); + + public void preProcessDispatch(MessageDispatch messageDispatch){ + next.preProcessDispatch(messageDispatch); } - + + public void postProcessDispatch(MessageDispatch messageDispatch){ + next.postProcessDispatch(messageDispatch); + } + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ next.processDispatchNotification(messageDispatchNotification); } - + public boolean isStopped(){ return next.isStopped(); } - + public Set getDurableDestinations(){ return next.getDurableDestinations(); } - + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ - next.addDestinationInfo(context, info); - + next.addDestinationInfo(context,info); } public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ - next.removeDestinationInfo(context, info); - + next.removeDestinationInfo(context,info); } public boolean isFaultTolerantConfiguration(){ return next.isFaultTolerantConfiguration(); } - public ConnectionContext getAdminConnectionContext() { + public ConnectionContext getAdminConnectionContext(){ return next.getAdminConnectionContext(); } - public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { + public void setAdminConnectionContext(ConnectionContext adminConnectionContext){ next.setAdminConnectionContext(adminConnectionContext); } - - public Store getTempDataStore() { + + public Store getTempDataStore(){ return next.getTempDataStore(); } - + public URI getVmConnectorURI(){ return next.getVmConnectorURI(); } @@ -241,20 +242,24 @@ public class BrokerFilter implements Broker { public void brokerServiceStarted(){ next.brokerServiceStarted(); } - + public BrokerService getBrokerService(){ return next.getBrokerService(); } + public boolean isExpired(MessageReference messageReference){ + return next.isExpired(messageReference); + } + public void messageExpired(ConnectionContext context,MessageReference message){ - next.messageExpired(context,message); + next.messageExpired(context,message); } public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){ - next.sendToDeadLetterQueue(context,messageReference); + next.sendToDeadLetterQueue(context,messageReference); } - public Broker getRoot() { - return next.getRoot(); + public Broker getRoot(){ + return next.getRoot(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index b5658fb572..4a6f8d3c50 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -185,13 +185,10 @@ public class EmptyBroker implements Broker { return null; } - /** - * Notifiy the Broker that a dispatch has happened - * - * @param messageDispatch - */ - public void processDispatch(MessageDispatch messageDispatch) { - + public void preProcessDispatch(MessageDispatch messageDispatch) { + } + + public void postProcessDispatch(MessageDispatch messageDispatch) { } public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { @@ -245,6 +242,10 @@ public class EmptyBroker implements Broker { return null; } + public boolean isExpired(MessageReference messageReference) { + return false; + } + public void messageExpired(ConnectionContext context,MessageReference message){ } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 071893fba8..6e799ce379 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -188,7 +188,11 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } - public void processDispatch(MessageDispatch messageDispatch) { + public void preProcessDispatch(MessageDispatch messageDispatch) { + throw new BrokerStoppedException(this.message); + } + + public void postProcessDispatch(MessageDispatch messageDispatch) { throw new BrokerStoppedException(this.message); } @@ -244,6 +248,10 @@ public class ErrorBroker implements Broker { public BrokerService getBrokerService(){ throw new BrokerStoppedException(this.message); } + + public boolean isExpired(MessageReference messageReference) { + throw new BrokerStoppedException(this.message); + } public void messageExpired(ConnectionContext context,MessageReference message){ throw new BrokerStoppedException(this.message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 849495fd39..ccec451ea6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -200,8 +200,12 @@ public class MutableBrokerFilter implements Broker { return getNext().getPeerBrokerInfos(); } - public void processDispatch(MessageDispatch messageDispatch){ - getNext().processDispatch(messageDispatch); + public void preProcessDispatch(MessageDispatch messageDispatch){ + getNext().preProcessDispatch(messageDispatch); + } + + public void postProcessDispatch(MessageDispatch messageDispatch){ + getNext().postProcessDispatch(messageDispatch); } public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ @@ -259,6 +263,9 @@ public class MutableBrokerFilter implements Broker { return getNext().getBrokerService(); } + public boolean isExpired(MessageReference messageReference) { + return getNext().isExpired(messageReference); + } public void messageExpired(ConnectionContext context,MessageReference message){ getNext().messageExpired(context,message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 73042bf22f..bdb873d257 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -741,7 +741,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(message.isMessageDispatch()) { MessageDispatch md=(MessageDispatch) message; Runnable sub=md.getTransmitCallback(); - broker.processDispatch(md); + broker.postProcessDispatch(md); if(sub!=null){ sub.run(); } @@ -749,25 +749,26 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } } - protected void processDispatch(Command command) throws IOException { - try { - if( !disposed.get() ) { - dispatch(command); + protected void processDispatch(Command command) throws IOException{ + final MessageDispatch messageDispatch=(MessageDispatch)(command.isMessageDispatch()?command:null); + try{ + if(!disposed.get()){ + if(messageDispatch!=null){ + broker.preProcessDispatch(messageDispatch); + } + dispatch(command); } - } finally { - - if(command.isMessageDispatch()){ - MessageDispatch md=(MessageDispatch) command; - Runnable sub=md.getTransmitCallback(); - broker.processDispatch(md); + }finally{ + if(messageDispatch!=null){ + Runnable sub=messageDispatch.getTransmitCallback(); + broker.postProcessDispatch(messageDispatch); if(sub!=null){ sub.run(); } } - getStatistics().getDequeues().increment(); } - } + } @@ -918,7 +919,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(command.isMessageDispatch()) { MessageDispatch md=(MessageDispatch) command; Runnable sub=md.getTransmitCallback(); - broker.processDispatch(md); + broker.postProcessDispatch(md); if(sub!=null){ sub.run(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java index d92fdb91ca..f4dc196b99 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java @@ -283,7 +283,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * * @param messageDispatch */ - public void processDispatch(MessageDispatch messageDispatch){ + public void postProcessDispatch(MessageDispatch messageDispatch){ MessageDispatchNotification mdn=new MessageDispatchNotification(); mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); @@ -293,7 +293,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ mdn.setMessageId(msg.getMessageId()); sendAsyncToSlave(mdn); } - super.processDispatch(messageDispatch); + super.postProcessDispatch(messageDispatch); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 23db44f6d5..50cbd90597 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -382,7 +382,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ pending.remove(); // Message may have been sitting in the pending list a while // waiting for the consumer to ak the message. - if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ + if(node!=QueueMessageReference.NULL_MESSAGE&&broker.isExpired(node)){ broker.messageExpired(getContext(),node); dequeueCounter++; continue; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 4459500322..c3612d3a31 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -137,7 +137,7 @@ public class Queue implements Destination, Task { public boolean recoverMessage(Message message){ // Message could have expired while it was being loaded.. - if(message.isExpired()){ + if(broker.isExpired(message)){ broker.messageExpired(createConnectionContext(),message); destinationStatistics.getMessages().decrement(); return true; @@ -379,7 +379,7 @@ public class Queue implements Destination, Task { try { // While waiting for space to free up... the message may have expired. - if(message.isExpired()) { + if(broker.isExpired(message)) { broker.messageExpired(context,message); destinationStatistics.getMessages().decrement(); } else { @@ -455,7 +455,7 @@ public class Queue implements Destination, Task { try { // It could take while before we receive the commit // op, by that time the message could have expired.. - if(message.isExpired()){ + if(broker.isExpired(message)){ broker.messageExpired(context,message); destinationStatistics.getMessages().decrement(); return; @@ -1014,7 +1014,7 @@ public class Queue implements Destination, Task { while(messages.hasNext()&&count 0 ) && !context.isInRecoveryMode() ) { @@ -286,7 +286,7 @@ public class Topic implements Destination { public void run() { // While waiting for space to free up... the message may have expired. - if(message.isExpired()){ + if(broker.isExpired(message)){ broker.messageExpired(context,message); destinationStatistics.getMessages().decrement(); @@ -357,7 +357,7 @@ public class Topic implements Destination { public void afterCommit() throws Exception { // It could take while before we receive the commit // operration.. by that time the message could have expired.. - if( message.isExpired() ) { + if(broker.isExpired(message) ) { broker.messageExpired(context,message); message.decrementReferenceCount(); destinationStatistics.getMessages().decrement(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 507757c9cf..9575ceb902 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -129,7 +129,7 @@ public class TopicSubscription extends AbstractSubscription{ matched.reset(); while(matched.hasNext()){ MessageReference node=matched.next(); - if(node.isExpired()){ + if(broker.isExpired(node)){ matched.remove(); dispatchedCounter.incrementAndGet(); node.decrementReferenceCount(); @@ -361,7 +361,7 @@ public class TopicSubscription extends AbstractSubscription{ matched.remove(); // Message may have been sitting in the matched list a while // waiting for the consumer to ak the message. - if(message.isExpired()){ + if(broker.isExpired(message)){ message.decrementReferenceCount(); broker.messageExpired(getContext(),message); dequeueCounter.incrementAndGet(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 55f754dcd9..9020ee9b23 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -28,6 +28,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy private boolean processNonPersistent=true; private boolean processExpired=true; + public boolean isSendToDeadLetterQueue(Message message){ boolean result=false; if(message!=null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java index e8a84d97b3..962ca72d32 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java @@ -18,7 +18,6 @@ package org.apache.activemq.broker.util; import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.Message; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java index d147d2a9d5..4b4269cba8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java @@ -189,9 +189,9 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport { return super.prepareTransaction(context, xid); } - public void processDispatch(MessageDispatch messageDispatch) { + public void postProcessDispatch(MessageDispatch messageDispatch) { trace(messageDispatch); - super.processDispatch(messageDispatch); + super.postProcessDispatch(messageDispatch); } public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index fef94a7b58..4cbfed54e5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -54,6 +54,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess protected long expiration; protected long timestamp; protected long arrival; + protected long brokerInTime; + protected long brokerOutTime; protected String correlationId; protected ActiveMQDestination replyTo; protected boolean persistent; @@ -83,6 +85,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess private BrokerId [] brokerPath; protected boolean droppable = false; private BrokerId [] cluster; + + abstract public Message copy(); @@ -123,6 +127,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess copy.arrival = arrival; copy.connection = connection; copy.regionDestination = regionDestination; + copy.brokerInTime=brokerInTime; + copy.brokerOutTime=brokerOutTime; //copying the broker path breaks networks - if a consumer re-uses a consumed //message and forwards it on //copy.brokerPath = brokerPath; @@ -630,4 +636,26 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess public boolean isMessage() { return true; } + + /** + * @openwire:property version=3 + */ + public long getBrokerInTime(){ + return this.brokerInTime; + } + + public void setBrokerInTime(long brokerInTime){ + this.brokerInTime=brokerInTime; + } + + /** + * @openwire:property version=3 + */ + public long getBrokerOutTime(){ + return this.brokerOutTime; + } + + public void setBrokerOutTime(long brokerOutTime){ + this.brokerOutTime=brokerOutTime; + } }