From a414c20dcbab7279a168eed3d40ac3c4d342cc61 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 20 Jul 2007 17:08:10 +0000 Subject: [PATCH] Fix for: http://issues.apache.org/activemq/browse/AMQ-1207 http://issues.apache.org/activemq/browse/AMQ-880 http://issues.apache.org/activemq/browse/AMQ-450 http://issues.apache.org/activemq/browse/AMQ-879 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@558054 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 16 ++- .../activemq/advisory/AdvisorySupport.java | 7 ++ .../org/apache/activemq/broker/Broker.java | 41 ++++++- .../apache/activemq/broker/BrokerFilter.java | 21 +++- .../apache/activemq/broker/EmptyBroker.java | 21 +++- .../apache/activemq/broker/ErrorBroker.java | 15 ++- .../activemq/broker/MutableBrokerFilter.java | 22 +++- .../broker/region/AbstractRegion.java | 5 +- .../broker/region/DestinationFactoryImpl.java | 107 +++++++++--------- .../broker/region/PrefetchSubscription.java | 16 +-- .../apache/activemq/broker/region/Queue.java | 30 +++-- .../activemq/broker/region/RegionBroker.java | 52 ++++++++- .../broker/region/TempQueueRegion.java | 2 +- .../apache/activemq/broker/region/Topic.java | 20 ++-- .../broker/region/TopicSubscription.java | 21 +++- .../policy/AbstractDeadLetterStrategy.java | 74 ++++++++++++ .../region/policy/DeadLetterStrategy.java | 9 ++ .../policy/IndividualDeadLetterStrategy.java | 2 +- .../policy/SharedDeadLetterStrategy.java | 2 +- .../apache/activemq/broker/StubBroker.java | 11 ++ 20 files changed, 367 insertions(+), 127 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index dbbd2bd857..baa755d563 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -17,6 +17,7 @@ */ package org.apache.activemq.advisory; +import java.io.IOException; import java.util.Iterator; import org.apache.activemq.broker.Broker; @@ -24,6 +25,7 @@ import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -38,6 +40,8 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.util.concurrent.ConcurrentHashMap; @@ -49,7 +53,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class AdvisoryBroker extends BrokerFilter { - //private static final Log log = LogFactory.getLog(AdvisoryBroker.class); + private static final Log log = LogFactory.getLog(AdvisoryBroker.class); protected final ConcurrentHashMap connections = new ConcurrentHashMap(); protected final ConcurrentHashMap consumers = new ConcurrentHashMap(); @@ -229,6 +233,16 @@ public class AdvisoryBroker extends BrokerFilter { } } + public void messageExpired(ConnectionContext context,MessageReference messageReference){ + next.messageExpired(context,messageReference); + try{ + ActiveMQTopic topic=AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); + fireAdvisory(context,topic,messageReference.getMessage()); + }catch(Exception e){ + log.warn("Failed to fire message expired advisory"); + } + } + protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { fireAdvisory(context, topic, command, null); } diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index de5eaf2f3c..380eb6b84b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -64,6 +64,13 @@ public class AdvisorySupport { return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getPhysicalName()); } + public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) { + if (destination.isQueue()) { + return getExpiredQueueMessageAdvisoryTopic(destination); + } + return getExpiredTopicMessageAdvisoryTopic(destination); + } + public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) { String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName(); return new ActiveMQTopic(name); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 986f6ceb10..91d98a4aed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -20,19 +20,15 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Set; import org.apache.activemq.Service; -import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Region; -import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; @@ -135,6 +131,8 @@ public interface Broker extends Region, Service { /** * Gets a list of all the prepared xa transactions. + * @param context transaction ids + * @return * @throws Exception TODO */ public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception; @@ -151,7 +149,7 @@ public interface Broker extends Region, Service { * Prepares a transaction. Only valid for xa transactions. * @param context * @param xid - * @return + * @return id * @throws Exception TODO */ public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception; @@ -176,6 +174,9 @@ public interface Broker extends Region, Service { /** * Forgets a transaction. + * @param context + * @param transactionId + * @throws Exception */ public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception; @@ -246,7 +247,35 @@ public interface Broker extends Region, Service { */ public URI getVmConnectorURI(); + /** + * called when the brokerService starts + */ public void brokerServiceStarted(); + /** + * @return the BrokerService + */ BrokerService getBrokerService(); + + /** + * Ensure we get the Broker at the top of the Stack + * @return the broker at the top of the Stack + */ + Broker getRoot(); + + /** + * A Message has Expired + * @param context + * @param messageReference + * @throws Exception + */ + public void messageExpired(ConnectionContext context, MessageReference messageReference); + + /** + * A message needs to go the a DLQ + * @param context + * @param messageReference + * @throws Exception + */ + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index e972eeeaf8..08fe93d754 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -17,9 +17,12 @@ */ package org.apache.activemq.broker; +import java.net.URI; +import java.util.Map; +import java.util.Set; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -38,10 +41,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; -import java.net.URI; -import java.util.Map; -import java.util.Set; - /** * Allows you to intercept broker operation so that features such as security can be * implemented as a pluggable filter. @@ -246,4 +245,16 @@ public class BrokerFilter implements Broker { public BrokerService getBrokerService(){ return next.getBrokerService(); } + + public void messageExpired(ConnectionContext context,MessageReference message){ + next.messageExpired(context,message); + } + + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){ + next.sendToDeadLetterQueue(context,messageReference); + } + + public Broker getRoot() { + return next.getRoot(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index f5f961f2f1..b5658fb572 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -17,9 +17,13 @@ */ package org.apache.activemq.broker; +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import java.util.Set; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -38,11 +42,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; -import java.net.URI; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - /** * Dumb implementation - used to be overriden by listeners * @@ -245,4 +244,14 @@ public class EmptyBroker implements Broker { public BrokerService getBrokerService(){ return null; } + + public void messageExpired(ConnectionContext context,MessageReference message){ + } + + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){ + } + + public Broker getRoot(){ + return null; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index a714c9a1cc..071893fba8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -21,10 +21,9 @@ import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; - import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -245,4 +244,16 @@ public class ErrorBroker implements Broker { public BrokerService getBrokerService(){ throw new BrokerStoppedException(this.message); } + + public void messageExpired(ConnectionContext context,MessageReference message){ + throw new BrokerStoppedException(this.message); + } + + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){ + throw new BrokerStoppedException(this.message); + } + + public Broker getRoot(){ + throw new BrokerStoppedException(this.message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index d875287d86..849495fd39 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -17,9 +17,12 @@ */ package org.apache.activemq.broker; +import java.net.URI; +import java.util.Map; +import java.util.Set; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -38,10 +41,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; -import java.net.URI; -import java.util.Map; -import java.util.Set; - /** * Like a BrokerFilter but it allows you to switch the getNext().broker. This has more * overhead than a BrokerFilter since access to the getNext().broker has to synchronized @@ -260,4 +259,17 @@ public class MutableBrokerFilter implements Broker { return getNext().getBrokerService(); } + + public void messageExpired(ConnectionContext context,MessageReference message){ + getNext().messageExpired(context,message); + } + + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) { + getNext().sendToDeadLetterQueue(context,messageReference); + } + + public Broker getRoot(){ + return getNext().getRoot(); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index d5d47d9db4..3452c86689 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -332,14 +332,15 @@ abstract public class AbstractRegion implements Region { // Try to auto create the destination... re-invoke broker from the // top so that the proper security checks are performed. try { + + context.getBroker().addDestination(context,destination); dest = addDestination(context, destination); - //context.getBroker().addDestination(context,destination); } catch (DestinationAlreadyExistsException e) { // if the destination already exists then lets ignore this error } // We should now have the dest created. - //dest=(Destination) destinations.get(destination); + dest=(Destination) destinations.get(destination); } if(dest==null){ throw new JMSException("The destination "+destination+" does not exist."); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index 15dac01740..15fa3e8d1a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -42,118 +42,121 @@ import org.apache.activemq.thread.TaskRunnerFactory; * @author fateev@amazon.com * @version $Revision$ */ -public class DestinationFactoryImpl extends DestinationFactory { +public class DestinationFactoryImpl extends DestinationFactory{ protected final UsageManager memoryManager; protected final TaskRunnerFactory taskRunnerFactory; protected final PersistenceAdapter persistenceAdapter; protected RegionBroker broker; - public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, - PersistenceAdapter persistenceAdapter) { - this.memoryManager = memoryManager; - this.taskRunnerFactory = taskRunnerFactory; - if (persistenceAdapter == null) { + public DestinationFactoryImpl(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, + PersistenceAdapter persistenceAdapter){ + this.memoryManager=memoryManager; + this.taskRunnerFactory=taskRunnerFactory; + if(persistenceAdapter==null){ throw new IllegalArgumentException("null persistenceAdapter"); } - this.persistenceAdapter = persistenceAdapter; + this.persistenceAdapter=persistenceAdapter; } - public void setRegionBroker(RegionBroker broker) { - if (broker == null) { + public void setRegionBroker(RegionBroker broker){ + if(broker==null){ throw new IllegalArgumentException("null broker"); } - this.broker = broker; + this.broker=broker; } - public Set getDestinations() { + public Set getDestinations(){ return persistenceAdapter.getDestinations(); } /** * @return instance of {@link Queue} or {@link Topic} */ - public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { - if (destination.isQueue()) { - if (destination.isTemporary()) { - final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; - return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) { - - public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { + public Destination createDestination(ConnectionContext context,ActiveMQDestination destination, + DestinationStatistics destinationStatistics) throws Exception{ + if(destination.isQueue()){ + if(destination.isTemporary()){ + final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination; + return new Queue(broker.getRoot(),destination,memoryManager,null, + destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){ + + public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{ // Only consumers on the same connection can consume from // the temporary destination - if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { + if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){ throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest); } - super.addSubscription(context, sub); + super.addSubscription(context,sub); }; }; - } else { - MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); - Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()); - configureQueue(queue, destination); + }else{ + MessageStore store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination); + Queue queue=new Queue(broker.getRoot(),destination,memoryManager,store, + destinationStatistics,taskRunnerFactory,broker.getTempDataStore()); + configureQueue(queue,destination); queue.initialize(); return queue; } - } else if (destination.isTemporary()){ - final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; - return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) { - public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { + }else if(destination.isTemporary()){ + final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination; + return new Topic(broker.getRoot(),destination,null,memoryManager, + destinationStatistics,taskRunnerFactory){ + + public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{ // Only consumers on the same connection can consume from // the temporary destination - if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { + if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){ throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest); } - super.addSubscription(context, sub); + super.addSubscription(context,sub); }; }; - } else { - TopicMessageStore store = null; - if (!AdvisorySupport.isAdvisoryTopic(destination)) { - store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination); + }else{ + TopicMessageStore store=null; + if(!AdvisorySupport.isAdvisoryTopic(destination)){ + store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination); } - - Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory); - configureTopic(topic, destination); - + Topic topic=new Topic(broker.getRoot(),destination,store,memoryManager, + destinationStatistics,taskRunnerFactory); + configureTopic(topic,destination); return topic; } } - protected void configureQueue(Queue queue, ActiveMQDestination destination) { - if (broker == null) { + protected void configureQueue(Queue queue,ActiveMQDestination destination){ + if(broker==null){ throw new IllegalStateException("broker property is not set"); } - if (broker.getDestinationPolicy() != null) { - PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); - if (entry != null) { + if(broker.getDestinationPolicy()!=null){ + PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); + if(entry!=null){ entry.configure(queue,broker.getTempDataStore()); } } } - protected void configureTopic(Topic topic, ActiveMQDestination destination) { - if (broker == null) { + protected void configureTopic(Topic topic,ActiveMQDestination destination){ + if(broker==null){ throw new IllegalStateException("broker property is not set"); } - if (broker.getDestinationPolicy() != null) { - PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); - if (entry != null) { + if(broker.getDestinationPolicy()!=null){ + PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); + if(entry!=null){ entry.configure(topic); } } } - public long getLastMessageBrokerSequenceId() throws IOException { + public long getLastMessageBrokerSequenceId() throws IOException{ return persistenceAdapter.getLastMessageBrokerSequenceId(); } - public PersistenceAdapter getPersistenceAdapter() { + public PersistenceAdapter getPersistenceAdapter(){ return persistenceAdapter; } - public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { + public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException{ return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index c2b047f96c..23db44f6d5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -276,17 +276,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ * @throws Exception */ protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{ - // Send the message to the DLQ - Message message=node.getMessage(); - if(message!=null){ - // The original destination and transaction id do not get filled when the message is first - // sent, - // it is only populated if the message is routed to another destination like the DLQ - DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); - ActiveMQDestination deadLetterDestination=deadLetterStrategy - .getDeadLetterQueueFor(message.getDestination()); - BrokerSupport.resend(context,message,deadLetterDestination); - } + broker.sendToDeadLetterQueue(context,node); } /** @@ -393,7 +383,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ // Message may have been sitting in the pending list a while // waiting for the consumer to ak the message. if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ - continue; // just drop it. + broker.messageExpired(getContext(),node); + dequeueCounter++; + continue; } dispatch(node); count++; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 453a79c5d7..39b2a957f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -72,7 +73,6 @@ import org.apache.commons.logging.LogFactory; public class Queue implements Destination, Task { private final Log log; - private final ActiveMQDestination destination; private final List consumers = new CopyOnWriteArrayList(); private final Valve dispatchValve = new Valve(true); @@ -96,9 +96,11 @@ public class Queue implements Destination, Task { private final Object doDispatchMutex = new Object(); private TaskRunner taskRunner; private boolean started = false; + final Broker broker; - public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, + public Queue(Broker broker,ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { + this.broker=broker; this.destination = destination; this.usageManager = new UsageManager(memoryManager,destination.toString()); this.usageManager.setUsagePortion(1.0f); @@ -136,7 +138,8 @@ public class Queue implements Destination, Task { public void recoverMessage(Message message){ // Message could have expired while it was being loaded.. if(message.isExpired()){ - // TODO remove from store + broker.messageExpired(createConnectionContext(),message); + destinationStatistics.getMessages().decrement(); return; } message.setRegionDestination(Queue.this); @@ -342,9 +345,8 @@ public class Queue implements Destination, Task { // There is delay between the client sending it and it arriving at the // destination.. it may have expired. if(message.isExpired()){ - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } + broker.messageExpired(context,message); + destinationStatistics.getMessages().decrement(); if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) { ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); @@ -365,9 +367,8 @@ public class Queue implements Destination, Task { // While waiting for space to free up... the message may have expired. if(message.isExpired()){ - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } + broker.messageExpired(context,message); + destinationStatistics.getMessages().decrement(); if( !message.isResponseRequired() && !context.isInRecoveryMode() ) { ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); @@ -440,10 +441,8 @@ public class Queue implements Destination, Task { // It could take while before we receive the commit // op, by that time the message could have expired.. if(message.isExpired()){ - // TODO: remove message from store. - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } + broker.messageExpired(context,message); + destinationStatistics.getMessages().decrement(); return; } sendMessage(context,message); @@ -1011,9 +1010,8 @@ public class Queue implements Destination, Task { result.add(node); count++; }else{ - if (log.isDebugEnabled()) { - log.debug("Expired message: " + node); - } + broker.messageExpired(createConnectionContext(),node); + destinationStatistics.getMessages().decrement(); } } }finally{ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index b46c60d332..83454bf726 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -37,6 +37,7 @@ import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.DestinationAlreadyExistsException; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; @@ -62,6 +63,7 @@ import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceStopper; @@ -625,6 +627,52 @@ public class RegionBroker implements Broker { public BrokerService getBrokerService(){ return brokerService; } - - + + public void messageExpired(ConnectionContext context,MessageReference node){ + if(log.isDebugEnabled()){ + log.debug("Message expired "+node); + } + getRoot().sendToDeadLetterQueue(context,node); + } + + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference node){ + try{ + if(node!=null){ + Message message=node.getMessage(); + if(message!=null){ + DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); + if(deadLetterStrategy!=null){ + if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ + long expiration=message.getExpiration(); + message.setExpiration(0); + message.setProperty("originalExpiration",new Long(expiration)); + if(!message.isPersistent()){ + message.setPersistent(true); + message.setProperty("originalDeliveryMode","NON_PERSISTENT"); + } + // The original destination and transaction id do not get filled when the message is first + // sent, + // it is only populated if the message is routed to another destination like the DLQ + ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message + .getDestination()); + BrokerSupport.resend(context,message,deadLetterDestination); + } + } + }else{ + log.warn("Null message for node: "+node); + } + } + }catch(Exception e){ + log.warn("Failed to pass expired message to dead letter queue"); + } + } + + public Broker getRoot(){ + try{ + return getBrokerService().getBroker(); + }catch(Exception e){ + log.fatal("Trying to get Root Broker "+e); + throw new RuntimeException("The broker from the BrokerService should not throw an exception"); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index 05c7e98338..c40685ff6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -41,7 +41,7 @@ public class TempQueueRegion extends AbstractRegion { protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; - return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) { + return new Queue(broker.getRoot(),destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 881bcba932..567e5fca26 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; @@ -72,10 +73,11 @@ public class Topic implements Destination { private boolean sendAdvisoryIfNoConsumers; private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap(); + final Broker broker; - public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, + public Topic(Broker broker,ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) { - + this.broker=broker; this.destination = destination; this.store = store; //this could be NULL! (If an advsiory) this.usageManager = new UsageManager(memoryManager,destination.toString()); @@ -261,9 +263,8 @@ public class Topic implements Destination { // There is delay between the client sending it and it arriving at the // destination.. it may have expired. if( message.isExpired() ) { - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } + broker.messageExpired(context,message); + destinationStatistics.getMessages().decrement(); if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) { ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); @@ -285,9 +286,8 @@ public class Topic implements Destination { // While waiting for space to free up... the message may have expired. if(message.isExpired()){ - if (log.isDebugEnabled()) { - log.debug("Expired message: " + message); - } + broker.messageExpired(context,message); + destinationStatistics.getMessages().decrement(); if( !message.isResponseRequired() && !context.isInRecoveryMode() ) { ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); @@ -357,7 +357,9 @@ public class Topic implements Destination { // It could take while before we receive the commit // operration.. by that time the message could have expired.. if( message.isExpired() ) { - // TODO: remove message from store. + broker.messageExpired(context,message); + message.decrementReferenceCount(); + destinationStatistics.getMessages().decrement(); return; } dispatch(context, message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 96bc9e4ec2..507757c9cf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -103,12 +103,7 @@ public class TopicSubscription extends AbstractSubscription{ int messagesToEvict=oldMessages.length; for(int i=0;i