Added support for length of time messages are processed by the broker -

fix for https://issues.apache.org/activemq/browse/AMQ-1160,
https://issues.apache.org/activemq/browse/AMQ-1072,
https://issues.apache.org/activemq/browse/AMQ-936 
and ground work for for https://issues.apache.org/activemq/browse/AMQ-567

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@561026 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-30 16:01:37 +00:00
parent b97f7d8479
commit a401575569
18 changed files with 204 additions and 126 deletions

View File

@ -142,6 +142,7 @@ public class ActiveMQConnectionMetaData implements ConnectionMetaData {
jmxProperties.put("JMSXGroupID", "1"); jmxProperties.put("JMSXGroupID", "1");
jmxProperties.put("JMSXGroupSeq", "1"); jmxProperties.put("JMSXGroupSeq", "1");
jmxProperties.put("JMSXDeliveryCount","1"); jmxProperties.put("JMSXDeliveryCount","1");
jmxProperties.put("JMSXProducerTXID","1");
return jmxProperties.keys(); return jmxProperties.keys();
} }
} }

View File

@ -896,8 +896,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ActiveMQMessage message=createActiveMQMessage(md); ActiveMQMessage message=createActiveMQMessage(md);
beforeMessageIsConsumed(md); beforeMessageIsConsumed(md);
try{ try{
boolean expired=message.isExpired();
if(!expired){
listener.onMessage(message); listener.onMessage(message);
afterMessageIsConsumed(md,false); }
afterMessageIsConsumed(md,expired);
}catch(RuntimeException e){ }catch(RuntimeException e){
if(session.isDupsOkAcknowledge()||session.isAutoAcknowledge()){ if(session.isDupsOkAcknowledge()||session.isAutoAcknowledge()){
// Redeliver the message // Redeliver the message

View File

@ -186,12 +186,17 @@ public interface Broker extends Region, Service {
*/ */
BrokerInfo[] getPeerBrokerInfos(); BrokerInfo[] getPeerBrokerInfos();
/**
* Notify the Broker that a dispatch is going to happen
* @param messageDispatch
*/
public void preProcessDispatch(MessageDispatch messageDispatch);
/** /**
* Notify the Broker that a dispatch has happened * Notify the Broker that a dispatch has happened
* @param messageDispatch * @param messageDispatch
*/ */
public void processDispatch(MessageDispatch messageDispatch); public void postProcessDispatch(MessageDispatch messageDispatch);
/** /**
* @return true if the broker has stopped * @return true if the broker has stopped
@ -263,11 +268,18 @@ public interface Broker extends Region, Service {
*/ */
Broker getRoot(); Broker getRoot();
/**
* Determine if a message has expired -allows default behaviour to be overriden -
* as the timestamp set by the producer can be out of sync with the broker
* @param messageReference
* @return true if the message is expired
*/
public boolean isExpired(MessageReference messageReference);
/** /**
* A Message has Expired * A Message has Expired
* @param context * @param context
* @param messageReference * @param messageReference
* @throws Exception
*/ */
public void messageExpired(ConnectionContext context, MessageReference messageReference); public void messageExpired(ConnectionContext context, MessageReference messageReference);
@ -275,7 +287,8 @@ public interface Broker extends Region, Service {
* A message needs to go the a DLQ * A message needs to go the a DLQ
* @param context * @param context
* @param messageReference * @param messageReference
* @throws Exception
*/ */
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference); public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference);
} }

View File

@ -142,7 +142,8 @@ public class BrokerFilter implements Broker {
return next.addDestination(context,destination); return next.addDestination(context,destination);
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception{
next.removeDestination(context,destination,timeout); next.removeDestination(context,destination,timeout);
} }
@ -178,7 +179,6 @@ public class BrokerFilter implements Broker {
next.gc(); next.gc();
} }
public void addBroker(Connection connection,BrokerInfo info){ public void addBroker(Connection connection,BrokerInfo info){
next.addBroker(connection,info); next.addBroker(connection,info);
} }
@ -187,13 +187,16 @@ public class BrokerFilter implements Broker {
next.removeBroker(connection,info); next.removeBroker(connection,info);
} }
public BrokerInfo[] getPeerBrokerInfos(){ public BrokerInfo[] getPeerBrokerInfos(){
return next.getPeerBrokerInfos(); return next.getPeerBrokerInfos();
} }
public void processDispatch(MessageDispatch messageDispatch){ public void preProcessDispatch(MessageDispatch messageDispatch){
next.processDispatch(messageDispatch); next.preProcessDispatch(messageDispatch);
}
public void postProcessDispatch(MessageDispatch messageDispatch){
next.postProcessDispatch(messageDispatch);
} }
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{
@ -210,12 +213,10 @@ public class BrokerFilter implements Broker {
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
next.addDestinationInfo(context,info); next.addDestinationInfo(context,info);
} }
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
next.removeDestinationInfo(context,info); next.removeDestinationInfo(context,info);
} }
public boolean isFaultTolerantConfiguration(){ public boolean isFaultTolerantConfiguration(){
@ -246,6 +247,10 @@ public class BrokerFilter implements Broker {
return next.getBrokerService(); return next.getBrokerService();
} }
public boolean isExpired(MessageReference messageReference){
return next.isExpired(messageReference);
}
public void messageExpired(ConnectionContext context,MessageReference message){ public void messageExpired(ConnectionContext context,MessageReference message){
next.messageExpired(context,message); next.messageExpired(context,message);
} }

View File

@ -185,13 +185,10 @@ public class EmptyBroker implements Broker {
return null; return null;
} }
/** public void preProcessDispatch(MessageDispatch messageDispatch) {
* Notifiy the Broker that a dispatch has happened }
*
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch) {
public void postProcessDispatch(MessageDispatch messageDispatch) {
} }
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
@ -245,6 +242,10 @@ public class EmptyBroker implements Broker {
return null; return null;
} }
public boolean isExpired(MessageReference messageReference) {
return false;
}
public void messageExpired(ConnectionContext context,MessageReference message){ public void messageExpired(ConnectionContext context,MessageReference message){
} }

View File

@ -188,7 +188,11 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public void processDispatch(MessageDispatch messageDispatch) { public void preProcessDispatch(MessageDispatch messageDispatch) {
throw new BrokerStoppedException(this.message);
}
public void postProcessDispatch(MessageDispatch messageDispatch) {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
@ -245,6 +249,10 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public boolean isExpired(MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
public void messageExpired(ConnectionContext context,MessageReference message){ public void messageExpired(ConnectionContext context,MessageReference message){
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }

View File

@ -200,8 +200,12 @@ public class MutableBrokerFilter implements Broker {
return getNext().getPeerBrokerInfos(); return getNext().getPeerBrokerInfos();
} }
public void processDispatch(MessageDispatch messageDispatch){ public void preProcessDispatch(MessageDispatch messageDispatch){
getNext().processDispatch(messageDispatch); getNext().preProcessDispatch(messageDispatch);
}
public void postProcessDispatch(MessageDispatch messageDispatch){
getNext().postProcessDispatch(messageDispatch);
} }
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{
@ -259,6 +263,9 @@ public class MutableBrokerFilter implements Broker {
return getNext().getBrokerService(); return getNext().getBrokerService();
} }
public boolean isExpired(MessageReference messageReference) {
return getNext().isExpired(messageReference);
}
public void messageExpired(ConnectionContext context,MessageReference message){ public void messageExpired(ConnectionContext context,MessageReference message){
getNext().messageExpired(context,message); getNext().messageExpired(context,message);

View File

@ -741,7 +741,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(message.isMessageDispatch()) { if(message.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) message; MessageDispatch md=(MessageDispatch) message;
Runnable sub=md.getTransmitCallback(); Runnable sub=md.getTransmitCallback();
broker.processDispatch(md); broker.postProcessDispatch(md);
if(sub!=null){ if(sub!=null){
sub.run(); sub.run();
} }
@ -750,21 +750,22 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
} }
protected void processDispatch(Command command) throws IOException{ protected void processDispatch(Command command) throws IOException{
final MessageDispatch messageDispatch=(MessageDispatch)(command.isMessageDispatch()?command:null);
try{ try{
if(!disposed.get()){ if(!disposed.get()){
if(messageDispatch!=null){
broker.preProcessDispatch(messageDispatch);
}
dispatch(command); dispatch(command);
} }
}finally{ }finally{
if(messageDispatch!=null){
if(command.isMessageDispatch()){ Runnable sub=messageDispatch.getTransmitCallback();
MessageDispatch md=(MessageDispatch) command; broker.postProcessDispatch(messageDispatch);
Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){ if(sub!=null){
sub.run(); sub.run();
} }
} }
getStatistics().getDequeues().increment(); getStatistics().getDequeues().increment();
} }
} }
@ -918,7 +919,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(command.isMessageDispatch()) { if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command; MessageDispatch md=(MessageDispatch) command;
Runnable sub=md.getTransmitCallback(); Runnable sub=md.getTransmitCallback();
broker.processDispatch(md); broker.postProcessDispatch(md);
if(sub!=null){ if(sub!=null){
sub.run(); sub.run();
} }

View File

@ -283,7 +283,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* *
* @param messageDispatch * @param messageDispatch
*/ */
public void processDispatch(MessageDispatch messageDispatch){ public void postProcessDispatch(MessageDispatch messageDispatch){
MessageDispatchNotification mdn=new MessageDispatchNotification(); MessageDispatchNotification mdn=new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
@ -293,7 +293,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
mdn.setMessageId(msg.getMessageId()); mdn.setMessageId(msg.getMessageId());
sendAsyncToSlave(mdn); sendAsyncToSlave(mdn);
} }
super.processDispatch(messageDispatch); super.postProcessDispatch(messageDispatch);
} }
/** /**

View File

@ -382,7 +382,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
pending.remove(); pending.remove();
// Message may have been sitting in the pending list a while // Message may have been sitting in the pending list a while
// waiting for the consumer to ak the message. // waiting for the consumer to ak the message.
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ if(node!=QueueMessageReference.NULL_MESSAGE&&broker.isExpired(node)){
broker.messageExpired(getContext(),node); broker.messageExpired(getContext(),node);
dequeueCounter++; dequeueCounter++;
continue; continue;

View File

@ -137,7 +137,7 @@ public class Queue implements Destination, Task {
public boolean recoverMessage(Message message){ public boolean recoverMessage(Message message){
// Message could have expired while it was being loaded.. // Message could have expired while it was being loaded..
if(message.isExpired()){ if(broker.isExpired(message)){
broker.messageExpired(createConnectionContext(),message); broker.messageExpired(createConnectionContext(),message);
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
return true; return true;
@ -379,7 +379,7 @@ public class Queue implements Destination, Task {
try { try {
// While waiting for space to free up... the message may have expired. // While waiting for space to free up... the message may have expired.
if(message.isExpired()) { if(broker.isExpired(message)) {
broker.messageExpired(context,message); broker.messageExpired(context,message);
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
} else { } else {
@ -455,7 +455,7 @@ public class Queue implements Destination, Task {
try { try {
// It could take while before we receive the commit // It could take while before we receive the commit
// op, by that time the message could have expired.. // op, by that time the message could have expired..
if(message.isExpired()){ if(broker.isExpired(message)){
broker.messageExpired(context,message); broker.messageExpired(context,message);
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
return; return;
@ -1014,7 +1014,7 @@ public class Queue implements Destination, Task {
while(messages.hasNext()&&count<toPageIn){ while(messages.hasNext()&&count<toPageIn){
MessageReference node=messages.next(); MessageReference node=messages.next();
messages.remove(); messages.remove();
if(!node.isExpired()){ if(!broker.isExpired(node)){
node=createMessageReference(node.getMessage()); node=createMessageReference(node.getMessage());
result.add(node); result.add(node);
count++; count++;

View File

@ -385,6 +385,7 @@ public class RegionBroker implements Broker {
public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{ public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
long si=sequenceGenerator.getNextSequenceId(); long si=sequenceGenerator.getNextSequenceId();
message.getMessageId().setBrokerSequenceId(si); message.getMessageId().setBrokerSequenceId(si);
message.setBrokerInTime(System.currentTimeMillis());
if(producerExchange.isMutable()||producerExchange.getRegion()==null){ if(producerExchange.isMutable()||producerExchange.getRegion()==null){
ActiveMQDestination destination=message.getDestination(); ActiveMQDestination destination=message.getDestination();
// ensure the destination is registered with the RegionBroker // ensure the destination is registered with the RegionBroker
@ -538,8 +539,14 @@ public class RegionBroker implements Broker {
return result; return result;
} }
public void processDispatch(MessageDispatch messageDispatch){ public void preProcessDispatch(MessageDispatch messageDispatch){
Message message = messageDispatch.getMessage();
if(message != null) {
message.setBrokerOutTime(System.currentTimeMillis());
}
}
public void postProcessDispatch(MessageDispatch messageDispatch){
} }
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
@ -625,6 +632,10 @@ public class RegionBroker implements Broker {
return brokerService; return brokerService;
} }
public boolean isExpired(MessageReference messageReference) {
return messageReference.isExpired();
}
public void messageExpired(ConnectionContext context,MessageReference node){ public void messageExpired(ConnectionContext context,MessageReference node){
if(log.isDebugEnabled()){ if(log.isDebugEnabled()){
log.debug("Message expired "+node); log.debug("Message expired "+node);

View File

@ -263,7 +263,7 @@ public class Topic implements Destination {
// There is delay between the client sending it and it arriving at the // There is delay between the client sending it and it arriving at the
// destination.. it may have expired. // destination.. it may have expired.
if( message.isExpired() ) { if( broker.isExpired(message) ) {
broker.messageExpired(context,message); broker.messageExpired(context,message);
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) { if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
@ -286,7 +286,7 @@ public class Topic implements Destination {
public void run() { public void run() {
// While waiting for space to free up... the message may have expired. // While waiting for space to free up... the message may have expired.
if(message.isExpired()){ if(broker.isExpired(message)){
broker.messageExpired(context,message); broker.messageExpired(context,message);
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
@ -357,7 +357,7 @@ public class Topic implements Destination {
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
// It could take while before we receive the commit // It could take while before we receive the commit
// operration.. by that time the message could have expired.. // operration.. by that time the message could have expired..
if( message.isExpired() ) { if(broker.isExpired(message) ) {
broker.messageExpired(context,message); broker.messageExpired(context,message);
message.decrementReferenceCount(); message.decrementReferenceCount();
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();

View File

@ -129,7 +129,7 @@ public class TopicSubscription extends AbstractSubscription{
matched.reset(); matched.reset();
while(matched.hasNext()){ while(matched.hasNext()){
MessageReference node=matched.next(); MessageReference node=matched.next();
if(node.isExpired()){ if(broker.isExpired(node)){
matched.remove(); matched.remove();
dispatchedCounter.incrementAndGet(); dispatchedCounter.incrementAndGet();
node.decrementReferenceCount(); node.decrementReferenceCount();
@ -361,7 +361,7 @@ public class TopicSubscription extends AbstractSubscription{
matched.remove(); matched.remove();
// Message may have been sitting in the matched list a while // Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message. // waiting for the consumer to ak the message.
if(message.isExpired()){ if(broker.isExpired(message)){
message.decrementReferenceCount(); message.decrementReferenceCount();
broker.messageExpired(getContext(),message); broker.messageExpired(getContext(),message);
dequeueCounter.incrementAndGet(); dequeueCounter.incrementAndGet();

View File

@ -28,6 +28,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy
private boolean processNonPersistent=true; private boolean processNonPersistent=true;
private boolean processExpired=true; private boolean processExpired=true;
public boolean isSendToDeadLetterQueue(Message message){ public boolean isSendToDeadLetterQueue(Message message){
boolean result=false; boolean result=false;
if(message!=null){ if(message!=null){

View File

@ -18,7 +18,6 @@
package org.apache.activemq.broker.util; package org.apache.activemq.broker.util;
import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;

View File

@ -189,9 +189,9 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
return super.prepareTransaction(context, xid); return super.prepareTransaction(context, xid);
} }
public void processDispatch(MessageDispatch messageDispatch) { public void postProcessDispatch(MessageDispatch messageDispatch) {
trace(messageDispatch); trace(messageDispatch);
super.processDispatch(messageDispatch); super.postProcessDispatch(messageDispatch);
} }
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {

View File

@ -54,6 +54,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
protected long expiration; protected long expiration;
protected long timestamp; protected long timestamp;
protected long arrival; protected long arrival;
protected long brokerInTime;
protected long brokerOutTime;
protected String correlationId; protected String correlationId;
protected ActiveMQDestination replyTo; protected ActiveMQDestination replyTo;
protected boolean persistent; protected boolean persistent;
@ -84,6 +86,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
protected boolean droppable = false; protected boolean droppable = false;
private BrokerId [] cluster; private BrokerId [] cluster;
abstract public Message copy(); abstract public Message copy();
protected void copy(Message copy) { protected void copy(Message copy) {
@ -123,6 +127,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
copy.arrival = arrival; copy.arrival = arrival;
copy.connection = connection; copy.connection = connection;
copy.regionDestination = regionDestination; copy.regionDestination = regionDestination;
copy.brokerInTime=brokerInTime;
copy.brokerOutTime=brokerOutTime;
//copying the broker path breaks networks - if a consumer re-uses a consumed //copying the broker path breaks networks - if a consumer re-uses a consumed
//message and forwards it on //message and forwards it on
//copy.brokerPath = brokerPath; //copy.brokerPath = brokerPath;
@ -630,4 +636,26 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
public boolean isMessage() { public boolean isMessage() {
return true; return true;
} }
/**
* @openwire:property version=3
*/
public long getBrokerInTime(){
return this.brokerInTime;
}
public void setBrokerInTime(long brokerInTime){
this.brokerInTime=brokerInTime;
}
/**
* @openwire:property version=3
*/
public long getBrokerOutTime(){
return this.brokerOutTime;
}
public void setBrokerOutTime(long brokerOutTime){
this.brokerOutTime=brokerOutTime;
}
} }