diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 09c7c85a68..100e14a065 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -408,19 +408,23 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, - Subscription subscription){ - super.sendToDeadLetterQueue(context, messageReference, subscription); - try { - if(!messageReference.isAdvisory()) { - ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); - Message payload = messageReference.getMessage().copy(); - payload.clearBody(); - fireAdvisory(context, topic,payload); + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, + Subscription subscription) { + boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription); + if (wasDLQd) { + try { + if(!messageReference.isAdvisory()) { + ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); + Message payload = messageReference.getMessage().copy(); + payload.clearBody(); + fireAdvisory(context, topic,payload); + } + } catch (Exception e) { + handleFireFailure("add to DLQ", e); } - } catch (Exception e) { - handleFireFailure("add to DLQ", e); } + + return wasDLQd; } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index d6e6de8a9a..183e89a827 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; + import org.apache.activemq.Service; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -40,14 +41,14 @@ import org.apache.activemq.usage.Usage; /** * The Message Broker which routes messages, maintains subscriptions and * connections, acknowledges messages and handles transactions. - * - * + * + * */ public interface Broker extends Region, Service { /** * Get a Broker from the Broker Stack that is a particular class - * + * * @param type * @return */ @@ -70,7 +71,7 @@ public interface Broker extends Region, Service { /** * Remove a BrokerInfo - * + * * @param connection * @param info */ @@ -78,14 +79,14 @@ public interface Broker extends Region, Service { /** * A client is establishing a connection with the broker. - * + * * @throws Exception TODO */ void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception; /** * A client is disconnecting from the broker. - * + * * @param context the environment the operation is being executed under. * @param info * @param error null if the client requested the disconnect or the error @@ -96,7 +97,7 @@ public interface Broker extends Region, Service { /** * Adds a session. - * + * * @param context * @param info * @throws Exception TODO @@ -105,7 +106,7 @@ public interface Broker extends Region, Service { /** * Removes a session. - * + * * @param context * @param info * @throws Exception TODO @@ -114,18 +115,20 @@ public interface Broker extends Region, Service { /** * Adds a producer. - * + * * @param context the enviorment the operation is being executed under. * @throws Exception TODO */ + @Override void addProducer(ConnectionContext context, ProducerInfo info) throws Exception; /** * Removes a producer. - * + * * @param context the enviorment the operation is being executed under. * @throws Exception TODO */ + @Override void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception; /** @@ -142,7 +145,7 @@ public interface Broker extends Region, Service { /** * Gets a list of all the prepared xa transactions. - * + * * @param context transaction ids * @return * @throws Exception TODO @@ -151,7 +154,7 @@ public interface Broker extends Region, Service { /** * Starts a transaction. - * + * * @param context * @param xid * @throws Exception TODO @@ -160,7 +163,7 @@ public interface Broker extends Region, Service { /** * Prepares a transaction. Only valid for xa transactions. - * + * * @param context * @param xid * @return id @@ -170,7 +173,7 @@ public interface Broker extends Region, Service { /** * Rollsback a transaction. - * + * * @param context * @param xid * @throws Exception TODO @@ -180,7 +183,7 @@ public interface Broker extends Region, Service { /** * Commits a transaction. - * + * * @param context * @param xid * @param onePhase @@ -190,7 +193,7 @@ public interface Broker extends Region, Service { /** * Forgets a transaction. - * + * * @param context * @param transactionId * @throws Exception @@ -199,21 +202,21 @@ public interface Broker extends Region, Service { /** * Get the BrokerInfo's of any connected Brokers - * + * * @return array of peer BrokerInfos */ BrokerInfo[] getPeerBrokerInfos(); /** * Notify the Broker that a dispatch is going to happen - * + * * @param messageDispatch */ void preProcessDispatch(MessageDispatch messageDispatch); /** * Notify the Broker that a dispatch has happened - * + * * @param messageDispatch */ void postProcessDispatch(MessageDispatch messageDispatch); @@ -230,7 +233,7 @@ public interface Broker extends Region, Service { /** * Add and process a DestinationInfo object - * + * * @param context * @param info * @throws Exception @@ -239,7 +242,7 @@ public interface Broker extends Region, Service { /** * Remove and process a DestinationInfo object - * + * * @param context * @param info * @throws Exception @@ -260,7 +263,7 @@ public interface Broker extends Region, Service { /** * Sets the default administration connection context used when configuring * the broker on startup or via JMX - * + * * @param adminConnectionContext */ void setAdminConnectionContext(ConnectionContext adminConnectionContext); @@ -287,7 +290,7 @@ public interface Broker extends Region, Service { /** * Ensure we get the Broker at the top of the Stack - * + * * @return the broker at the top of the Stack */ Broker getRoot(); @@ -296,7 +299,7 @@ public interface Broker extends Region, Service { * Determine if a message has expired -allows default behaviour to be * overriden - as the timestamp set by the producer can be out of sync with * the broker - * + * * @param messageReference * @return true if the message is expired */ @@ -313,49 +316,51 @@ public interface Broker extends Region, Service { /** * A message needs to go the a DLQ - * + * * @param context * @param messageReference * @param subscription, may be null + * + * @return true if Message was placed in a DLQ false if discarded. */ - void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription); - + boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription); + /** * @return the broker sequence id */ long getBrokerSequenceId(); - + /** * called when message is consumed * @param context * @param messageReference */ void messageConsumed(ConnectionContext context, MessageReference messageReference); - + /** * Called when message is delivered to the broker * @param context * @param messageReference */ void messageDelivered(ConnectionContext context, MessageReference messageReference); - + /** * Called when a message is discarded - e.g. running low on memory * This will happen only if the policy is enabled - e.g. non durable topics * @param context - * @param sub + * @param sub * @param messageReference */ void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference); - + /** * Called when there is a slow consumer * @param context - * @param destination + * @param destination * @param subs */ void slowConsumer(ConnectionContext context,Destination destination, Subscription subs); - + /** * Called to notify a producer is too fast * @param context @@ -363,23 +368,23 @@ public interface Broker extends Region, Service { * @param destination */ void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination); - + /** * Called when a Usage reaches a limit * @param context - * @param destination + * @param destination * @param usage */ void isFull(ConnectionContext context,Destination destination,Usage usage); - + /** * called when the broker becomes the master in a master/slave * configuration */ void nowMasterBroker(); - + Scheduler getScheduler(); - + ThreadPoolExecutor getExecutor(); void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 0c842b3a1c..b59696933d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; + import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -47,8 +48,8 @@ import org.apache.activemq.usage.Usage; /** * Allows you to intercept broker operation so that features such as security * can be implemented as a pluggable filter. - * - * + * + * */ public class BrokerFilter implements Broker { @@ -58,6 +59,7 @@ public class BrokerFilter implements Broker { this.next = next; } + @Override public Broker getAdaptor(Class type) { if (type.isInstance(this)) { return this; @@ -65,257 +67,320 @@ public class BrokerFilter implements Broker { return next.getAdaptor(type); } + @Override public Map getDestinationMap() { return next.getDestinationMap(); } + @Override public Set getDestinations(ActiveMQDestination destination) { return next.getDestinations(destination); } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { next.acknowledge(consumerExchange, ack); } + @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { return next.messagePull(context, pull); } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { next.addConnection(context, info); } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { return next.addConsumer(context, info); } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { next.addProducer(context, info); } + @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { next.commitTransaction(context, xid, onePhase); } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { next.removeSubscription(context, info); } + @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { return next.getPreparedTransactions(context); } + @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { return next.prepareTransaction(context, xid); } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { next.removeConnection(context, info, error); } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { next.removeConsumer(context, info); } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { next.removeProducer(context, info); } + @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { next.rollbackTransaction(context, xid); } + @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { next.send(producerExchange, messageSend); } + @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { next.beginTransaction(context, xid); } + @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { next.forgetTransaction(context, transactionId); } + @Override public Connection[] getClients() throws Exception { return next.getClients(); } + @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { return next.addDestination(context, destination,createIfTemporary); } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { next.removeDestination(context, destination, timeout); } + @Override public ActiveMQDestination[] getDestinations() throws Exception { return next.getDestinations(); } + @Override public void start() throws Exception { next.start(); } + @Override public void stop() throws Exception { next.stop(); } + @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { next.addSession(context, info); } + @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { next.removeSession(context, info); } + @Override public BrokerId getBrokerId() { return next.getBrokerId(); } + @Override public String getBrokerName() { return next.getBrokerName(); } + @Override public void gc() { next.gc(); } + @Override public void addBroker(Connection connection, BrokerInfo info) { next.addBroker(connection, info); } + @Override public void removeBroker(Connection connection, BrokerInfo info) { next.removeBroker(connection, info); } + @Override public BrokerInfo[] getPeerBrokerInfos() { return next.getPeerBrokerInfos(); } + @Override public void preProcessDispatch(MessageDispatch messageDispatch) { next.preProcessDispatch(messageDispatch); } + @Override public void postProcessDispatch(MessageDispatch messageDispatch) { next.postProcessDispatch(messageDispatch); } + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { next.processDispatchNotification(messageDispatchNotification); } + @Override public boolean isStopped() { return next.isStopped(); } + @Override public Set getDurableDestinations() { return next.getDurableDestinations(); } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { next.addDestinationInfo(context, info); } + @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { next.removeDestinationInfo(context, info); } + @Override public boolean isFaultTolerantConfiguration() { return next.isFaultTolerantConfiguration(); } + @Override public ConnectionContext getAdminConnectionContext() { return next.getAdminConnectionContext(); } + @Override public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { next.setAdminConnectionContext(adminConnectionContext); } + @Override public PListStore getTempDataStore() { return next.getTempDataStore(); } + @Override public URI getVmConnectorURI() { return next.getVmConnectorURI(); } + @Override public void brokerServiceStarted() { next.brokerServiceStarted(); } + @Override public BrokerService getBrokerService() { return next.getBrokerService(); } + @Override public boolean isExpired(MessageReference messageReference) { return next.isExpired(messageReference); } + @Override public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { next.messageExpired(context, message, subscription); } - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) { - next.sendToDeadLetterQueue(context, messageReference, subscription); + return next.sendToDeadLetterQueue(context, messageReference, subscription); } + @Override public Broker getRoot() { return next.getRoot(); } + @Override public long getBrokerSequenceId() { return next.getBrokerSequenceId(); } - + + @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { next.fastProducer(context, producerInfo, destination); } + @Override public void isFull(ConnectionContext context,Destination destination, Usage usage) { next.isFull(context,destination, usage); } + @Override public void messageConsumed(ConnectionContext context,MessageReference messageReference) { next.messageConsumed(context, messageReference); } + @Override public void messageDelivered(ConnectionContext context,MessageReference messageReference) { next.messageDelivered(context, messageReference); } + @Override public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) { next.messageDiscarded(context, sub, messageReference); } + @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { next.slowConsumer(context, destination,subs); } - - public void nowMasterBroker() { + + @Override + public void nowMasterBroker() { next.nowMasterBroker(); } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { next.processConsumerControl(consumerExchange, control); } + @Override public Scheduler getScheduler() { return next.getScheduler(); } + @Override public ThreadPoolExecutor getExecutor() { return next.getExecutor(); } + @Override public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); } + @Override public void networkBridgeStopped(BrokerInfo brokerInfo) { next.networkBridgeStopped(brokerInfo); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index a6b4f03b0d..3fe2236339 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; + import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -47,19 +48,22 @@ import org.apache.activemq.usage.Usage; /** * Dumb implementation - used to be overriden by listeners - * - * + * + * */ public class EmptyBroker implements Broker { + @Override public BrokerId getBrokerId() { return null; } + @Override public String getBrokerName() { return null; } + @Override public Broker getAdaptor(Class type) { if (type.isInstance(this)) { return this; @@ -67,237 +71,298 @@ public class EmptyBroker implements Broker { return null; } + @Override @SuppressWarnings("unchecked") public Map getDestinationMap() { return Collections.EMPTY_MAP; } + @Override public Set getDestinations(ActiveMQDestination destination) { return Collections.EMPTY_SET; } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { } + @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { } + @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { } + @Override public Connection[] getClients() throws Exception { return null; } + @Override public ActiveMQDestination[] getDestinations() throws Exception { return null; } + @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { return null; } + @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { } + @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { return 0; } + @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { } + @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { } + @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { } + @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception { return null; } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { return null; } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { } + @Override public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { } + @Override public void gc() { } + @Override public void start() throws Exception { } + @Override public void stop() throws Exception { } + @Override public void addBroker(Connection connection, BrokerInfo info) { } + @Override public void removeBroker(Connection connection, BrokerInfo info) { } + @Override public BrokerInfo[] getPeerBrokerInfos() { return null; } + @Override public void preProcessDispatch(MessageDispatch messageDispatch) { } + @Override public void postProcessDispatch(MessageDispatch messageDispatch) { } + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { } + @Override public boolean isStopped() { return false; } + @Override public Set getDurableDestinations() { return null; } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { } + @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { } + @Override public boolean isFaultTolerantConfiguration() { return false; } + @Override public ConnectionContext getAdminConnectionContext() { return null; } + @Override public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { } + @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { return null; } + @Override public PListStore getTempDataStore() { return null; } + @Override public URI getVmConnectorURI() { return null; } + @Override public void brokerServiceStarted() { } + @Override public BrokerService getBrokerService() { return null; } + @Override public boolean isExpired(MessageReference messageReference) { return false; } + @Override public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { } - public void sendToDeadLetterQueue(ConnectionContext context, - MessageReference messageReference, - Subscription subscription) { + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, + Subscription subscription) { + return false; } + @Override public Broker getRoot() { return null; } - + + @Override public long getBrokerSequenceId() { return -1l; } - + + @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { } + @Override public void isFull(ConnectionContext context, Destination destination,Usage usage) { } + @Override public void messageConsumed(ConnectionContext context,MessageReference messageReference) { } + @Override public void messageDelivered(ConnectionContext context,MessageReference messageReference) { } + @Override public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { } + @Override public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) { } - public void nowMasterBroker() { + @Override + public void nowMasterBroker() { } + @Override public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { } + @Override public void networkBridgeStopped(BrokerInfo brokerInfo) { } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, - ConsumerControl control) { + ConsumerControl control) { } + @Override public Scheduler getScheduler() { return null; } + @Override public ThreadPoolExecutor getExecutor() { return null; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 573601438c..ceec09b196 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; + import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -48,8 +49,8 @@ import org.apache.activemq.usage.Usage; /** * Implementation of the broker where all it's methods throw an * BrokerStoppedException. - * - * + * + * */ public class ErrorBroker implements Broker { @@ -59,15 +60,18 @@ public class ErrorBroker implements Broker { this.message = message; } + @Override @SuppressWarnings("unchecked") public Map getDestinationMap() { return Collections.EMPTY_MAP; } + @Override public Set getDestinations(ActiveMQDestination destination) { return Collections.EMPTY_SET; } + @Override public Broker getAdaptor(Class type) { if (type.isInstance(this)) { return this; @@ -75,249 +79,310 @@ public class ErrorBroker implements Broker { return null; } + @Override public BrokerId getBrokerId() { throw new BrokerStoppedException(this.message); } + @Override public String getBrokerName() { throw new BrokerStoppedException(this.message); } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public Connection[] getClients() throws Exception { throw new BrokerStoppedException(this.message); } + @Override public ActiveMQDestination[] getDestinations() throws Exception { throw new BrokerStoppedException(this.message); } + @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void gc() { throw new BrokerStoppedException(this.message); } + @Override public void start() throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void stop() throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void addBroker(Connection connection, BrokerInfo info) { throw new BrokerStoppedException(this.message); } + @Override public void removeBroker(Connection connection, BrokerInfo info) { throw new BrokerStoppedException(this.message); } + @Override public BrokerInfo[] getPeerBrokerInfos() { throw new BrokerStoppedException(this.message); } + @Override public void preProcessDispatch(MessageDispatch messageDispatch) { throw new BrokerStoppedException(this.message); } + @Override public void postProcessDispatch(MessageDispatch messageDispatch) { throw new BrokerStoppedException(this.message); } + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public boolean isStopped() { return true; } + @Override public Set getDurableDestinations() { throw new BrokerStoppedException(this.message); } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { throw new BrokerStoppedException(this.message); } + @Override public boolean isFaultTolerantConfiguration() { throw new BrokerStoppedException(this.message); } + @Override public ConnectionContext getAdminConnectionContext() { throw new BrokerStoppedException(this.message); } + @Override public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { throw new BrokerStoppedException(this.message); } + @Override public Response messagePull(ConnectionContext context, MessagePull pull) { throw new BrokerStoppedException(this.message); } + @Override public PListStore getTempDataStore() { throw new BrokerStoppedException(this.message); } + @Override public URI getVmConnectorURI() { throw new BrokerStoppedException(this.message); } + @Override public void brokerServiceStarted() { throw new BrokerStoppedException(this.message); } + @Override public BrokerService getBrokerService() { throw new BrokerStoppedException(this.message); } + @Override public boolean isExpired(MessageReference messageReference) { throw new BrokerStoppedException(this.message); } + @Override public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { throw new BrokerStoppedException(this.message); } - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, - Subscription subscription) { + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, + Subscription subscription) { throw new BrokerStoppedException(this.message); } + @Override public Broker getRoot() { throw new BrokerStoppedException(this.message); } - + + @Override public long getBrokerSequenceId() { throw new BrokerStoppedException(this.message); } - + + @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { throw new BrokerStoppedException(this.message); } + @Override public void isFull(ConnectionContext context,Destination destination, Usage usage) { throw new BrokerStoppedException(this.message); } + @Override public void messageConsumed(ConnectionContext context,MessageReference messageReference) { throw new BrokerStoppedException(this.message); } + @Override public void messageDelivered(ConnectionContext context,MessageReference messageReference) { throw new BrokerStoppedException(this.message); } + @Override public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { throw new BrokerStoppedException(this.message); } + @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { throw new BrokerStoppedException(this.message); } - - public void nowMasterBroker() { + + @Override + public void nowMasterBroker() { throw new BrokerStoppedException(this.message); } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { throw new BrokerStoppedException(this.message); } + @Override public Scheduler getScheduler() { throw new BrokerStoppedException(this.message); } + @Override public ThreadPoolExecutor getExecutor() { throw new BrokerStoppedException(this.message); } + @Override public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { throw new BrokerStoppedException(this.message); } + @Override public void networkBridgeStopped(BrokerInfo brokerInfo) { throw new BrokerStoppedException(this.message); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index e4cd9b0ddb..25def6ca75 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; + import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -49,8 +50,8 @@ import org.apache.activemq.usage.Usage; * Like a BrokerFilter but it allows you to switch the getNext().broker. This * has more overhead than a BrokerFilter since access to the getNext().broker * has to synchronized since it is mutable - * - * + * + * */ public class MutableBrokerFilter implements Broker { @@ -60,6 +61,7 @@ public class MutableBrokerFilter implements Broker { this.next.set(next); } + @Override public Broker getAdaptor(Class type) { if (type.isInstance(this)) { return this; @@ -72,261 +74,324 @@ public class MutableBrokerFilter implements Broker { } public void setNext(Broker next) { - this.next.set(next); + this.next.set(next); } + @Override public Map getDestinationMap() { return getNext().getDestinationMap(); } + @Override public Set getDestinations(ActiveMQDestination destination) { return getNext().getDestinations(destination); } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { getNext().acknowledge(consumerExchange, ack); } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { getNext().addConnection(context, info); } + @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { return getNext().addConsumer(context, info); } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { getNext().addProducer(context, info); } + @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { getNext().commitTransaction(context, xid, onePhase); } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { getNext().removeSubscription(context, info); } + @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { return getNext().getPreparedTransactions(context); } + @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { return getNext().prepareTransaction(context, xid); } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { getNext().removeConnection(context, info, error); } + @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { getNext().removeConsumer(context, info); } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { getNext().removeProducer(context, info); } + @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { getNext().rollbackTransaction(context, xid); } + @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { getNext().send(producerExchange, messageSend); } + @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { getNext().beginTransaction(context, xid); } + @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { getNext().forgetTransaction(context, transactionId); } + @Override public Connection[] getClients() throws Exception { return getNext().getClients(); } + @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { return getNext().addDestination(context, destination,createIfTemporary); } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { getNext().removeDestination(context, destination, timeout); } + @Override public ActiveMQDestination[] getDestinations() throws Exception { return getNext().getDestinations(); } + @Override public void start() throws Exception { getNext().start(); } + @Override public void stop() throws Exception { getNext().stop(); } + @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { getNext().addSession(context, info); } + @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { getNext().removeSession(context, info); } + @Override public BrokerId getBrokerId() { return getNext().getBrokerId(); } + @Override public String getBrokerName() { return getNext().getBrokerName(); } + @Override public void gc() { getNext().gc(); } + @Override public void addBroker(Connection connection, BrokerInfo info) { getNext().addBroker(connection, info); } + @Override public void removeBroker(Connection connection, BrokerInfo info) { getNext().removeBroker(connection, info); } + @Override public BrokerInfo[] getPeerBrokerInfos() { return getNext().getPeerBrokerInfos(); } + @Override public void preProcessDispatch(MessageDispatch messageDispatch) { getNext().preProcessDispatch(messageDispatch); } + @Override public void postProcessDispatch(MessageDispatch messageDispatch) { getNext().postProcessDispatch(messageDispatch); } + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { getNext().processDispatchNotification(messageDispatchNotification); } + @Override public boolean isStopped() { return getNext().isStopped(); } + @Override public Set getDurableDestinations() { return getNext().getDurableDestinations(); } + @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { getNext().addDestinationInfo(context, info); } + @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { getNext().removeDestinationInfo(context, info); } + @Override public boolean isFaultTolerantConfiguration() { return getNext().isFaultTolerantConfiguration(); } + @Override public ConnectionContext getAdminConnectionContext() { return getNext().getAdminConnectionContext(); } + @Override public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { getNext().setAdminConnectionContext(adminConnectionContext); } + @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { return getNext().messagePull(context, pull); } + @Override public PListStore getTempDataStore() { return getNext().getTempDataStore(); } + @Override public URI getVmConnectorURI() { return getNext().getVmConnectorURI(); } + @Override public void brokerServiceStarted() { getNext().brokerServiceStarted(); } + @Override public BrokerService getBrokerService() { return getNext().getBrokerService(); } + @Override public boolean isExpired(MessageReference messageReference) { return getNext().isExpired(messageReference); } + @Override public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { getNext().messageExpired(context, message, subscription); } - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, - Subscription subscription) { - getNext().sendToDeadLetterQueue(context, messageReference, subscription); + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, + Subscription subscription) { + return getNext().sendToDeadLetterQueue(context, messageReference, subscription); } + @Override public Broker getRoot() { return getNext().getRoot(); } - + + @Override public long getBrokerSequenceId() { return getNext().getBrokerSequenceId(); } - + + @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { getNext().fastProducer(context, producerInfo, destination); } + @Override public void isFull(ConnectionContext context,Destination destination, Usage usage) { getNext().isFull(context,destination, usage); } + @Override public void messageConsumed(ConnectionContext context,MessageReference messageReference) { getNext().messageConsumed(context, messageReference); } + @Override public void messageDelivered(ConnectionContext context,MessageReference messageReference) { getNext().messageDelivered(context, messageReference); } + @Override public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { getNext().messageDiscarded(context, sub, messageReference); } + @Override public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) { getNext().slowConsumer(context, dest,subs); } - - public void nowMasterBroker() { + + @Override + public void nowMasterBroker() { getNext().nowMasterBroker(); } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { getNext().processConsumerControl(consumerExchange, control); } + @Override public Scheduler getScheduler() { return getNext().getScheduler(); } + @Override public ThreadPoolExecutor getExecutor() { return getNext().getExecutor(); } + @Override public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); } + @Override public void networkBridgeStopped(BrokerInfo brokerInfo) { getNext().networkBridgeStopped(brokerInfo); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index a0f273889b..228986cda0 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -701,7 +701,7 @@ public class RegionBroker extends EmptyBroker { } @Override - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) { + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) { try { if (node != null) { Message message = node.getMessage(); @@ -726,6 +726,7 @@ public class RegionBroker extends EmptyBroker { context.setBroker(getRoot()); } BrokerSupport.resendNoCopy(context, message, deadLetterDestination); + return true; } } else { if (LOG.isDebugEnabled()) { @@ -738,6 +739,8 @@ public class RegionBroker extends EmptyBroker { } catch (Exception e) { LOG.warn("Caught an exception sending to DLQ: " + node, e); } + + return false; } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index 4f9b6d676b..bbf5274933 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -17,7 +17,9 @@ package org.apache.activemq.broker.util; import java.util.Set; + import javax.annotation.PostConstruct; + import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; @@ -497,7 +499,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; @@ -506,7 +508,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { LOG.info("Sending to DLQ : " + msg); } - super.sendToDeadLetterQueue(context, messageReference, subscription); + return super.sendToDeadLetterQueue(context, messageReference, subscription); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java index 05a55727e9..fc37609bb1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.util; import java.io.IOException; + import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.broker.Broker; @@ -126,10 +127,10 @@ public class RedeliveryPlugin extends BrokerPluginSupport { } @Override - public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) { + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) { if (messageReference.isExpired()) { // there are two uses of sendToDeadLetterQueue, we are only interested in valid messages - super.sendToDeadLetterQueue(context, messageReference, subscription); + return super.sendToDeadLetterQueue(context, messageReference, subscription); } else { try { Destination regionDestination = (Destination) messageReference.getRegionDestination(); @@ -145,15 +146,17 @@ public class RedeliveryPlugin extends BrokerPluginSupport { scheduleRedelivery(context, messageReference, delay, ++redeliveryCount); } else if (isSendToDlqIfMaxRetriesExceeded()) { - super.sendToDeadLetterQueue(context, messageReference, subscription); + return super.sendToDeadLetterQueue(context, messageReference, subscription); } else { LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId()); } } else if (isFallbackToDeadLetter()) { - super.sendToDeadLetterQueue(context, messageReference, subscription); + return super.sendToDeadLetterQueue(context, messageReference, subscription); } else { LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + regionDestination.getActiveMQDestination()); } + + return false; } catch (Exception exception) { // abort the ack, will be effective if client use transactions or individual ack with sync send RuntimeException toThrow = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception); @@ -203,5 +206,4 @@ public class RedeliveryPlugin extends BrokerPluginSupport { } return 0; } - } diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java index cde3a9ad86..e8e82ab3ec 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.plugin; +package org.apache.activemq.plugin; import java.util.regex.Pattern; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; @@ -28,8 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author Filip Hanik - * @version 1.0 */ public class DiscardingDLQBroker extends BrokerFilter { public static Logger log = LoggerFactory.getLogger(DiscardingDLQBroker.class); @@ -45,8 +44,7 @@ public class DiscardingDLQBroker extends BrokerFilter { } @Override - public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, - Subscription subscription) { + public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription) { if (log.isTraceEnabled()) { log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null)); } @@ -58,35 +56,38 @@ public class DiscardingDLQBroker extends BrokerFilter { dest = msg.getDestination(); destName = dest.getPhysicalName(); - if (dest == null || destName == null ) { - //do nothing, no need to forward it - skipMessage("NULL DESTINATION",msgRef); + if (dest == null || destName == null) { + // do nothing, no need to forward it + skipMessage("NULL DESTINATION", msgRef); } else if (dropAll) { - //do nothing - skipMessage("dropAll",msgRef); + // do nothing + skipMessage("dropAll", msgRef); } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) { - //do nothing - skipMessage("dropTemporaryTopics",msgRef); + // do nothing + skipMessage("dropTemporaryTopics", msgRef); } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) { - //do nothing - skipMessage("dropTemporaryQueues",msgRef); - } else if (destFilter!=null && matches(destName)) { - //do nothing - skipMessage("dropOnly",msgRef); + // do nothing + skipMessage("dropTemporaryQueues", msgRef); + } else if (destFilter != null && matches(destName)) { + // do nothing + skipMessage("dropOnly", msgRef); } else { dropped = false; - next.sendToDeadLetterQueue(ctx, msgRef, subscription); + return next.sendToDeadLetterQueue(ctx, msgRef, subscription); } - if (dropped && getReportInterval()>0) { - if ((++dropCount)%getReportInterval() == 0 ) { - log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue"); + + if (dropped && getReportInterval() > 0) { + if ((++dropCount) % getReportInterval() == 0) { + log.info("Total of " + dropCount + " messages were discarded, since their destination was the dead letter queue"); } } + + return false; } public boolean matches(String destName) { - for (int i=0; destFilter!=null && i"); + MessageConsumer consumer = session.createConsumer(dlqDestination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + advised.set(true); + } + }); + connection.start(); + + ExecutorService service = Executors.newSingleThreadExecutor(); + + service.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(400); + producer.send(session.createTextMessage()); + producer.send(session.createTextMessage()); + TimeUnit.MILLISECONDS.sleep(500); + connection.close(); + } catch (Exception e) { + } + } + }); + + service.shutdown(); + assertTrue(service.awaitTermination(1, TimeUnit.MINUTES)); + assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get()); + } +}