Don't send MessageDLQd advisory for Message that aren't sent to a DLQ

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-07 21:17:34 +00:00
parent 5a299b76eb
commit 0bb9013769
11 changed files with 521 additions and 117 deletions

View File

@ -408,19 +408,23 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription){
super.sendToDeadLetterQueue(context, messageReference, subscription);
try {
if(!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic,payload);
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription);
if (wasDLQd) {
try {
if(!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic,payload);
}
} catch (Exception e) {
handleFireFailure("add to DLQ", e);
}
} catch (Exception e) {
handleFireFailure("add to DLQ", e);
}
return wasDLQd;
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import java.net.URI;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@ -40,14 +41,14 @@ import org.apache.activemq.usage.Usage;
/**
* The Message Broker which routes messages, maintains subscriptions and
* connections, acknowledges messages and handles transactions.
*
*
*
*
*/
public interface Broker extends Region, Service {
/**
* Get a Broker from the Broker Stack that is a particular class
*
*
* @param type
* @return
*/
@ -70,7 +71,7 @@ public interface Broker extends Region, Service {
/**
* Remove a BrokerInfo
*
*
* @param connection
* @param info
*/
@ -78,14 +79,14 @@ public interface Broker extends Region, Service {
/**
* A client is establishing a connection with the broker.
*
*
* @throws Exception TODO
*/
void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
/**
* A client is disconnecting from the broker.
*
*
* @param context the environment the operation is being executed under.
* @param info
* @param error null if the client requested the disconnect or the error
@ -96,7 +97,7 @@ public interface Broker extends Region, Service {
/**
* Adds a session.
*
*
* @param context
* @param info
* @throws Exception TODO
@ -105,7 +106,7 @@ public interface Broker extends Region, Service {
/**
* Removes a session.
*
*
* @param context
* @param info
* @throws Exception TODO
@ -114,18 +115,20 @@ public interface Broker extends Region, Service {
/**
* Adds a producer.
*
*
* @param context the enviorment the operation is being executed under.
* @throws Exception TODO
*/
@Override
void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
/**
* Removes a producer.
*
*
* @param context the enviorment the operation is being executed under.
* @throws Exception TODO
*/
@Override
void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
/**
@ -142,7 +145,7 @@ public interface Broker extends Region, Service {
/**
* Gets a list of all the prepared xa transactions.
*
*
* @param context transaction ids
* @return
* @throws Exception TODO
@ -151,7 +154,7 @@ public interface Broker extends Region, Service {
/**
* Starts a transaction.
*
*
* @param context
* @param xid
* @throws Exception TODO
@ -160,7 +163,7 @@ public interface Broker extends Region, Service {
/**
* Prepares a transaction. Only valid for xa transactions.
*
*
* @param context
* @param xid
* @return id
@ -170,7 +173,7 @@ public interface Broker extends Region, Service {
/**
* Rollsback a transaction.
*
*
* @param context
* @param xid
* @throws Exception TODO
@ -180,7 +183,7 @@ public interface Broker extends Region, Service {
/**
* Commits a transaction.
*
*
* @param context
* @param xid
* @param onePhase
@ -190,7 +193,7 @@ public interface Broker extends Region, Service {
/**
* Forgets a transaction.
*
*
* @param context
* @param transactionId
* @throws Exception
@ -199,21 +202,21 @@ public interface Broker extends Region, Service {
/**
* Get the BrokerInfo's of any connected Brokers
*
*
* @return array of peer BrokerInfos
*/
BrokerInfo[] getPeerBrokerInfos();
/**
* Notify the Broker that a dispatch is going to happen
*
*
* @param messageDispatch
*/
void preProcessDispatch(MessageDispatch messageDispatch);
/**
* Notify the Broker that a dispatch has happened
*
*
* @param messageDispatch
*/
void postProcessDispatch(MessageDispatch messageDispatch);
@ -230,7 +233,7 @@ public interface Broker extends Region, Service {
/**
* Add and process a DestinationInfo object
*
*
* @param context
* @param info
* @throws Exception
@ -239,7 +242,7 @@ public interface Broker extends Region, Service {
/**
* Remove and process a DestinationInfo object
*
*
* @param context
* @param info
* @throws Exception
@ -260,7 +263,7 @@ public interface Broker extends Region, Service {
/**
* Sets the default administration connection context used when configuring
* the broker on startup or via JMX
*
*
* @param adminConnectionContext
*/
void setAdminConnectionContext(ConnectionContext adminConnectionContext);
@ -287,7 +290,7 @@ public interface Broker extends Region, Service {
/**
* Ensure we get the Broker at the top of the Stack
*
*
* @return the broker at the top of the Stack
*/
Broker getRoot();
@ -296,7 +299,7 @@ public interface Broker extends Region, Service {
* Determine if a message has expired -allows default behaviour to be
* overriden - as the timestamp set by the producer can be out of sync with
* the broker
*
*
* @param messageReference
* @return true if the message is expired
*/
@ -313,49 +316,51 @@ public interface Broker extends Region, Service {
/**
* A message needs to go the a DLQ
*
*
* @param context
* @param messageReference
* @param subscription, may be null
*
* @return true if Message was placed in a DLQ false if discarded.
*/
void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
/**
* @return the broker sequence id
*/
long getBrokerSequenceId();
/**
* called when message is consumed
* @param context
* @param messageReference
*/
void messageConsumed(ConnectionContext context, MessageReference messageReference);
/**
* Called when message is delivered to the broker
* @param context
* @param messageReference
*/
void messageDelivered(ConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics
* @param context
* @param sub
* @param sub
* @param messageReference
*/
void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
/**
* Called when there is a slow consumer
* @param context
* @param destination
* @param destination
* @param subs
*/
void slowConsumer(ConnectionContext context,Destination destination, Subscription subs);
/**
* Called to notify a producer is too fast
* @param context
@ -363,23 +368,23 @@ public interface Broker extends Region, Service {
* @param destination
*/
void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination);
/**
* Called when a Usage reaches a limit
* @param context
* @param destination
* @param destination
* @param usage
*/
void isFull(ConnectionContext context,Destination destination,Usage usage);
/**
* called when the broker becomes the master in a master/slave
* configuration
*/
void nowMasterBroker();
Scheduler getScheduler();
ThreadPoolExecutor getExecutor();
void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp);

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@ -47,8 +48,8 @@ import org.apache.activemq.usage.Usage;
/**
* Allows you to intercept broker operation so that features such as security
* can be implemented as a pluggable filter.
*
*
*
*
*/
public class BrokerFilter implements Broker {
@ -58,6 +59,7 @@ public class BrokerFilter implements Broker {
this.next = next;
}
@Override
public Broker getAdaptor(Class type) {
if (type.isInstance(this)) {
return this;
@ -65,257 +67,320 @@ public class BrokerFilter implements Broker {
return next.getAdaptor(type);
}
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return next.getDestinationMap();
}
@Override
public Set <Destination>getDestinations(ActiveMQDestination destination) {
return next.getDestinations(destination);
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
next.acknowledge(consumerExchange, ack);
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return next.messagePull(context, pull);
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info);
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
return next.addConsumer(context, info);
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info);
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
next.commitTransaction(context, xid, onePhase);
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
next.removeSubscription(context, info);
}
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
return next.getPreparedTransactions(context);
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
return next.prepareTransaction(context, xid);
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
next.removeConnection(context, info, error);
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
next.removeConsumer(context, info);
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.removeProducer(context, info);
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
next.rollbackTransaction(context, xid);
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
next.send(producerExchange, messageSend);
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
next.beginTransaction(context, xid);
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
next.forgetTransaction(context, transactionId);
}
@Override
public Connection[] getClients() throws Exception {
return next.getClients();
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
return next.addDestination(context, destination,createIfTemporary);
}
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout);
}
@Override
public ActiveMQDestination[] getDestinations() throws Exception {
return next.getDestinations();
}
@Override
public void start() throws Exception {
next.start();
}
@Override
public void stop() throws Exception {
next.stop();
}
@Override
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
next.addSession(context, info);
}
@Override
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
next.removeSession(context, info);
}
@Override
public BrokerId getBrokerId() {
return next.getBrokerId();
}
@Override
public String getBrokerName() {
return next.getBrokerName();
}
@Override
public void gc() {
next.gc();
}
@Override
public void addBroker(Connection connection, BrokerInfo info) {
next.addBroker(connection, info);
}
@Override
public void removeBroker(Connection connection, BrokerInfo info) {
next.removeBroker(connection, info);
}
@Override
public BrokerInfo[] getPeerBrokerInfos() {
return next.getPeerBrokerInfos();
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
next.preProcessDispatch(messageDispatch);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
next.postProcessDispatch(messageDispatch);
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
}
@Override
public boolean isStopped() {
return next.isStopped();
}
@Override
public Set<ActiveMQDestination> getDurableDestinations() {
return next.getDurableDestinations();
}
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
next.addDestinationInfo(context, info);
}
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
next.removeDestinationInfo(context, info);
}
@Override
public boolean isFaultTolerantConfiguration() {
return next.isFaultTolerantConfiguration();
}
@Override
public ConnectionContext getAdminConnectionContext() {
return next.getAdminConnectionContext();
}
@Override
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
next.setAdminConnectionContext(adminConnectionContext);
}
@Override
public PListStore getTempDataStore() {
return next.getTempDataStore();
}
@Override
public URI getVmConnectorURI() {
return next.getVmConnectorURI();
}
@Override
public void brokerServiceStarted() {
next.brokerServiceStarted();
}
@Override
public BrokerService getBrokerService() {
return next.getBrokerService();
}
@Override
public boolean isExpired(MessageReference messageReference) {
return next.isExpired(messageReference);
}
@Override
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
next.messageExpired(context, message, subscription);
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
next.sendToDeadLetterQueue(context, messageReference, subscription);
return next.sendToDeadLetterQueue(context, messageReference, subscription);
}
@Override
public Broker getRoot() {
return next.getRoot();
}
@Override
public long getBrokerSequenceId() {
return next.getBrokerSequenceId();
}
@Override
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
next.fastProducer(context, producerInfo, destination);
}
@Override
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
next.isFull(context,destination, usage);
}
@Override
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
next.messageConsumed(context, messageReference);
}
@Override
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
next.messageDelivered(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) {
next.messageDiscarded(context, sub, messageReference);
}
@Override
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
next.slowConsumer(context, destination,subs);
}
public void nowMasterBroker() {
@Override
public void nowMasterBroker() {
next.nowMasterBroker();
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) {
next.processConsumerControl(consumerExchange, control);
}
@Override
public Scheduler getScheduler() {
return next.getScheduler();
}
@Override
public ThreadPoolExecutor getExecutor() {
return next.getExecutor();
}
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
}
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) {
next.networkBridgeStopped(brokerInfo);
}

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@ -47,19 +48,22 @@ import org.apache.activemq.usage.Usage;
/**
* Dumb implementation - used to be overriden by listeners
*
*
*
*
*/
public class EmptyBroker implements Broker {
@Override
public BrokerId getBrokerId() {
return null;
}
@Override
public String getBrokerName() {
return null;
}
@Override
public Broker getAdaptor(Class type) {
if (type.isInstance(this)) {
return this;
@ -67,237 +71,298 @@ public class EmptyBroker implements Broker {
return null;
}
@Override
@SuppressWarnings("unchecked")
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return Collections.EMPTY_MAP;
}
@Override
public Set getDestinations(ActiveMQDestination destination) {
return Collections.EMPTY_SET;
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
}
@Override
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
}
@Override
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
}
@Override
public Connection[] getClients() throws Exception {
return null;
}
@Override
public ActiveMQDestination[] getDestinations() throws Exception {
return null;
}
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
return null;
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
return 0;
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception {
return null;
}
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
return null;
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
}
@Override
public void gc() {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public void addBroker(Connection connection, BrokerInfo info) {
}
@Override
public void removeBroker(Connection connection, BrokerInfo info) {
}
@Override
public BrokerInfo[] getPeerBrokerInfos() {
return null;
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Set<ActiveMQDestination> getDurableDestinations() {
return null;
}
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
}
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
}
@Override
public boolean isFaultTolerantConfiguration() {
return false;
}
@Override
public ConnectionContext getAdminConnectionContext() {
return null;
}
@Override
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return null;
}
@Override
public PListStore getTempDataStore() {
return null;
}
@Override
public URI getVmConnectorURI() {
return null;
}
@Override
public void brokerServiceStarted() {
}
@Override
public BrokerService getBrokerService() {
return null;
}
@Override
public boolean isExpired(MessageReference messageReference) {
return false;
}
@Override
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
}
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference,
Subscription subscription) {
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
return false;
}
@Override
public Broker getRoot() {
return null;
}
@Override
public long getBrokerSequenceId() {
return -1l;
}
@Override
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
}
@Override
public void isFull(ConnectionContext context, Destination destination,Usage usage) {
}
@Override
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
}
@Override
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
}
@Override
public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
}
public void nowMasterBroker() {
@Override
public void nowMasterBroker() {
}
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
}
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) {
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) {
ConsumerControl control) {
}
@Override
public Scheduler getScheduler() {
return null;
}
@Override
public ThreadPoolExecutor getExecutor() {
return null;
}

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@ -48,8 +49,8 @@ import org.apache.activemq.usage.Usage;
/**
* Implementation of the broker where all it's methods throw an
* BrokerStoppedException.
*
*
*
*
*/
public class ErrorBroker implements Broker {
@ -59,15 +60,18 @@ public class ErrorBroker implements Broker {
this.message = message;
}
@Override
@SuppressWarnings("unchecked")
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return Collections.EMPTY_MAP;
}
@Override
public Set getDestinations(ActiveMQDestination destination) {
return Collections.EMPTY_SET;
}
@Override
public Broker getAdaptor(Class type) {
if (type.isInstance(this)) {
return this;
@ -75,249 +79,310 @@ public class ErrorBroker implements Broker {
return null;
}
@Override
public BrokerId getBrokerId() {
throw new BrokerStoppedException(this.message);
}
@Override
public String getBrokerName() {
throw new BrokerStoppedException(this.message);
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public Connection[] getClients() throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public ActiveMQDestination[] getDestinations() throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void gc() {
throw new BrokerStoppedException(this.message);
}
@Override
public void start() throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void stop() throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void addBroker(Connection connection, BrokerInfo info) {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeBroker(Connection connection, BrokerInfo info) {
throw new BrokerStoppedException(this.message);
}
@Override
public BrokerInfo[] getPeerBrokerInfos() {
throw new BrokerStoppedException(this.message);
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
throw new BrokerStoppedException(this.message);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
throw new BrokerStoppedException(this.message);
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public boolean isStopped() {
return true;
}
@Override
public Set<ActiveMQDestination> getDurableDestinations() {
throw new BrokerStoppedException(this.message);
}
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
throw new BrokerStoppedException(this.message);
}
@Override
public boolean isFaultTolerantConfiguration() {
throw new BrokerStoppedException(this.message);
}
@Override
public ConnectionContext getAdminConnectionContext() {
throw new BrokerStoppedException(this.message);
}
@Override
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
throw new BrokerStoppedException(this.message);
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
@Override
public PListStore getTempDataStore() {
throw new BrokerStoppedException(this.message);
}
@Override
public URI getVmConnectorURI() {
throw new BrokerStoppedException(this.message);
}
@Override
public void brokerServiceStarted() {
throw new BrokerStoppedException(this.message);
}
@Override
public BrokerService getBrokerService() {
throw new BrokerStoppedException(this.message);
}
@Override
public boolean isExpired(MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
@Override
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
throw new BrokerStoppedException(this.message);
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
throw new BrokerStoppedException(this.message);
}
@Override
public Broker getRoot() {
throw new BrokerStoppedException(this.message);
}
@Override
public long getBrokerSequenceId() {
throw new BrokerStoppedException(this.message);
}
@Override
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
throw new BrokerStoppedException(this.message);
}
@Override
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
throw new BrokerStoppedException(this.message);
}
@Override
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
@Override
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
@Override
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
throw new BrokerStoppedException(this.message);
}
public void nowMasterBroker() {
@Override
public void nowMasterBroker() {
throw new BrokerStoppedException(this.message);
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) {
throw new BrokerStoppedException(this.message);
}
@Override
public Scheduler getScheduler() {
throw new BrokerStoppedException(this.message);
}
@Override
public ThreadPoolExecutor getExecutor() {
throw new BrokerStoppedException(this.message);
}
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
throw new BrokerStoppedException(this.message);
}
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) {
throw new BrokerStoppedException(this.message);
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@ -49,8 +50,8 @@ import org.apache.activemq.usage.Usage;
* Like a BrokerFilter but it allows you to switch the getNext().broker. This
* has more overhead than a BrokerFilter since access to the getNext().broker
* has to synchronized since it is mutable
*
*
*
*
*/
public class MutableBrokerFilter implements Broker {
@ -60,6 +61,7 @@ public class MutableBrokerFilter implements Broker {
this.next.set(next);
}
@Override
public Broker getAdaptor(Class type) {
if (type.isInstance(this)) {
return this;
@ -72,261 +74,324 @@ public class MutableBrokerFilter implements Broker {
}
public void setNext(Broker next) {
this.next.set(next);
this.next.set(next);
}
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return getNext().getDestinationMap();
}
@Override
public Set getDestinations(ActiveMQDestination destination) {
return getNext().getDestinations(destination);
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
getNext().acknowledge(consumerExchange, ack);
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
getNext().addConnection(context, info);
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
return getNext().addConsumer(context, info);
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
getNext().addProducer(context, info);
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
getNext().commitTransaction(context, xid, onePhase);
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
getNext().removeSubscription(context, info);
}
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
return getNext().getPreparedTransactions(context);
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
return getNext().prepareTransaction(context, xid);
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
getNext().removeConnection(context, info, error);
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
getNext().removeConsumer(context, info);
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
getNext().removeProducer(context, info);
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
getNext().rollbackTransaction(context, xid);
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
getNext().send(producerExchange, messageSend);
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
getNext().beginTransaction(context, xid);
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
getNext().forgetTransaction(context, transactionId);
}
@Override
public Connection[] getClients() throws Exception {
return getNext().getClients();
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
return getNext().addDestination(context, destination,createIfTemporary);
}
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
getNext().removeDestination(context, destination, timeout);
}
@Override
public ActiveMQDestination[] getDestinations() throws Exception {
return getNext().getDestinations();
}
@Override
public void start() throws Exception {
getNext().start();
}
@Override
public void stop() throws Exception {
getNext().stop();
}
@Override
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
getNext().addSession(context, info);
}
@Override
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
getNext().removeSession(context, info);
}
@Override
public BrokerId getBrokerId() {
return getNext().getBrokerId();
}
@Override
public String getBrokerName() {
return getNext().getBrokerName();
}
@Override
public void gc() {
getNext().gc();
}
@Override
public void addBroker(Connection connection, BrokerInfo info) {
getNext().addBroker(connection, info);
}
@Override
public void removeBroker(Connection connection, BrokerInfo info) {
getNext().removeBroker(connection, info);
}
@Override
public BrokerInfo[] getPeerBrokerInfos() {
return getNext().getPeerBrokerInfos();
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
getNext().preProcessDispatch(messageDispatch);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
getNext().postProcessDispatch(messageDispatch);
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
getNext().processDispatchNotification(messageDispatchNotification);
}
@Override
public boolean isStopped() {
return getNext().isStopped();
}
@Override
public Set<ActiveMQDestination> getDurableDestinations() {
return getNext().getDurableDestinations();
}
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
getNext().addDestinationInfo(context, info);
}
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
getNext().removeDestinationInfo(context, info);
}
@Override
public boolean isFaultTolerantConfiguration() {
return getNext().isFaultTolerantConfiguration();
}
@Override
public ConnectionContext getAdminConnectionContext() {
return getNext().getAdminConnectionContext();
}
@Override
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
getNext().setAdminConnectionContext(adminConnectionContext);
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return getNext().messagePull(context, pull);
}
@Override
public PListStore getTempDataStore() {
return getNext().getTempDataStore();
}
@Override
public URI getVmConnectorURI() {
return getNext().getVmConnectorURI();
}
@Override
public void brokerServiceStarted() {
getNext().brokerServiceStarted();
}
@Override
public BrokerService getBrokerService() {
return getNext().getBrokerService();
}
@Override
public boolean isExpired(MessageReference messageReference) {
return getNext().isExpired(messageReference);
}
@Override
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
getNext().messageExpired(context, message, subscription);
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
getNext().sendToDeadLetterQueue(context, messageReference, subscription);
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
return getNext().sendToDeadLetterQueue(context, messageReference, subscription);
}
@Override
public Broker getRoot() {
return getNext().getRoot();
}
@Override
public long getBrokerSequenceId() {
return getNext().getBrokerSequenceId();
}
@Override
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
getNext().fastProducer(context, producerInfo, destination);
}
@Override
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
getNext().isFull(context,destination, usage);
}
@Override
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
getNext().messageConsumed(context, messageReference);
}
@Override
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
getNext().messageDelivered(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
getNext().messageDiscarded(context, sub, messageReference);
}
@Override
public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
getNext().slowConsumer(context, dest,subs);
}
public void nowMasterBroker() {
@Override
public void nowMasterBroker() {
getNext().nowMasterBroker();
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) {
getNext().processConsumerControl(consumerExchange, control);
}
@Override
public Scheduler getScheduler() {
return getNext().getScheduler();
}
@Override
public ThreadPoolExecutor getExecutor() {
return getNext().getExecutor();
}
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
}
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) {
getNext().networkBridgeStopped(brokerInfo);
}

View File

@ -701,7 +701,7 @@ public class RegionBroker extends EmptyBroker {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
try {
if (node != null) {
Message message = node.getMessage();
@ -726,6 +726,7 @@ public class RegionBroker extends EmptyBroker {
context.setBroker(getRoot());
}
BrokerSupport.resendNoCopy(context, message, deadLetterDestination);
return true;
}
} else {
if (LOG.isDebugEnabled()) {
@ -738,6 +739,8 @@ public class RegionBroker extends EmptyBroker {
} catch (Exception e) {
LOG.warn("Caught an exception sending to DLQ: " + node, e);
}
return false;
}
@Override

View File

@ -17,7 +17,9 @@
package org.apache.activemq.broker.util;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
@ -497,7 +499,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
@ -506,7 +508,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
LOG.info("Sending to DLQ : " + msg);
}
super.sendToDeadLetterQueue(context, messageReference, subscription);
return super.sendToDeadLetterQueue(context, messageReference, subscription);
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.util;
import java.io.IOException;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.Broker;
@ -126,10 +127,10 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
if (messageReference.isExpired()) {
// there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
super.sendToDeadLetterQueue(context, messageReference, subscription);
return super.sendToDeadLetterQueue(context, messageReference, subscription);
} else {
try {
Destination regionDestination = (Destination) messageReference.getRegionDestination();
@ -145,15 +146,17 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
} else if (isSendToDlqIfMaxRetriesExceeded()) {
super.sendToDeadLetterQueue(context, messageReference, subscription);
return super.sendToDeadLetterQueue(context, messageReference, subscription);
} else {
LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId());
}
} else if (isFallbackToDeadLetter()) {
super.sendToDeadLetterQueue(context, messageReference, subscription);
return super.sendToDeadLetterQueue(context, messageReference, subscription);
} else {
LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + regionDestination.getActiveMQDestination());
}
return false;
} catch (Exception exception) {
// abort the ack, will be effective if client use transactions or individual ack with sync send
RuntimeException toThrow = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
@ -203,5 +206,4 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
}
return 0;
}
}

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
package org.apache.activemq.plugin;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@ -28,8 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Filip Hanik
* @version 1.0
*/
public class DiscardingDLQBroker extends BrokerFilter {
public static Logger log = LoggerFactory.getLogger(DiscardingDLQBroker.class);
@ -45,8 +44,7 @@ public class DiscardingDLQBroker extends BrokerFilter {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef,
Subscription subscription) {
public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription) {
if (log.isTraceEnabled()) {
log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
}
@ -58,35 +56,38 @@ public class DiscardingDLQBroker extends BrokerFilter {
dest = msg.getDestination();
destName = dest.getPhysicalName();
if (dest == null || destName == null ) {
//do nothing, no need to forward it
skipMessage("NULL DESTINATION",msgRef);
if (dest == null || destName == null) {
// do nothing, no need to forward it
skipMessage("NULL DESTINATION", msgRef);
} else if (dropAll) {
//do nothing
skipMessage("dropAll",msgRef);
// do nothing
skipMessage("dropAll", msgRef);
} else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) {
//do nothing
skipMessage("dropTemporaryTopics",msgRef);
// do nothing
skipMessage("dropTemporaryTopics", msgRef);
} else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) {
//do nothing
skipMessage("dropTemporaryQueues",msgRef);
} else if (destFilter!=null && matches(destName)) {
//do nothing
skipMessage("dropOnly",msgRef);
// do nothing
skipMessage("dropTemporaryQueues", msgRef);
} else if (destFilter != null && matches(destName)) {
// do nothing
skipMessage("dropOnly", msgRef);
} else {
dropped = false;
next.sendToDeadLetterQueue(ctx, msgRef, subscription);
return next.sendToDeadLetterQueue(ctx, msgRef, subscription);
}
if (dropped && getReportInterval()>0) {
if ((++dropCount)%getReportInterval() == 0 ) {
log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue");
if (dropped && getReportInterval() > 0) {
if ((++dropCount) % getReportInterval() == 0) {
log.info("Total of " + dropCount + " messages were discarded, since their destination was the dead letter queue");
}
}
return false;
}
public boolean matches(String destName) {
for (int i=0; destFilter!=null && i<destFilter.length; i++) {
if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) {
for (int i = 0; destFilter != null && i < destFilter.length; i++) {
if (destFilter[i] != null && destFilter[i].matcher(destName).matches()) {
return true;
}
}
@ -95,7 +96,7 @@ public class DiscardingDLQBroker extends BrokerFilter {
private void skipMessage(String prefix, MessageReference msgRef) {
if (log.isDebugEnabled()) {
String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
String lmsg = "Discarding DLQ BrokerFilter[" + prefix + "] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null);
log.debug(lmsg);
}
}
@ -139,5 +140,4 @@ public class DiscardingDLQBroker extends BrokerFilter {
public int getReportInterval() {
return reportInterval;
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AMQ4517Test {
private BrokerService brokerService;
private String connectionUri;
@Before
public void setup() throws Exception {
brokerService = new BrokerService();
connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
// Configure Dead Letter Strategy
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
strategy.setProcessNonPersistent(false);
strategy.setProcessExpired(false);
// Add policy and individual DLQ strategy
PolicyEntry policy = new PolicyEntry();
policy.setTimeBeforeDispatchStarts(3000);
policy.setDeadLetterStrategy(strategy);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
brokerService.setPersistent(false);
brokerService.start();
}
@After
public void stop() throws Exception {
brokerService.stop();
}
@Test(timeout=360000)
public void test() throws Exception {
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
final AtomicBoolean advised = new AtomicBoolean(false);
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dlqDestination = session.createTopic(AdvisorySupport.MESSAGE_DLQ_TOPIC_PREFIX + ">");
MessageConsumer consumer = session.createConsumer(dlqDestination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
advised.set(true);
}
});
connection.start();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
@Override
public void run() {
try {
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(400);
producer.send(session.createTextMessage());
producer.send(session.createTextMessage());
TimeUnit.MILLISECONDS.sleep(500);
connection.close();
} catch (Exception e) {
}
}
});
service.shutdown();
assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get());
}
}