From 5076808b57d0152142a76b0c91c7519434a2454c Mon Sep 17 00:00:00 2001 From: Hadrian Zbarcea Date: Fri, 4 Nov 2016 12:27:18 -0400 Subject: [PATCH] AMQ-6497 Add method to navigate interceptors --- .../activemq/advisory/AdvisoryBroker.java | 2 +- .../org/apache/activemq/broker/Broker.java | 4 +- .../apache/activemq/broker/BrokerFilter.java | 149 +++---- .../apache/activemq/broker/EmptyBroker.java | 55 +-- .../apache/activemq/broker/ErrorBroker.java | 18 +- .../activemq/broker/MutableBrokerFilter.java | 374 +----------------- .../apache/activemq/broker/region/Region.java | 2 +- .../activemq/broker/region/RegionBroker.java | 9 - .../broker/util/LoggingBrokerPlugin.java | 2 +- .../broker/BrokerInterceptorsTest.java | 61 +++ .../activemq/broker/LinkStealingTest.java | 8 +- .../camel/camelplugin/CamelRoutesBroker.java | 2 +- 12 files changed, 167 insertions(+), 519 deletions(-) create mode 100644 activemq-broker/src/test/java/org/apache/activemq/broker/BrokerInterceptorsTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 556c149238..4118dbe91d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -683,7 +683,7 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public void isFull(ConnectionContext context, Destination destination, Usage usage) { + public void isFull(ConnectionContext context, Destination destination, Usage usage) { super.isFull(context, destination, usage); if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index 87cb3bcd2f..6ab9f2cada 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -52,7 +52,7 @@ public interface Broker extends Region, Service { * @param type * @return a Broker instance. */ - Broker getAdaptor(Class type); + Broker getAdaptor(Class type); /** * Get the id of the broker @@ -384,7 +384,7 @@ public interface Broker extends Region, Service { * @param destination * @param usage */ - void isFull(ConnectionContext context,Destination destination,Usage usage); + void isFull(ConnectionContext context,Destination destination,Usage usage); void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 2a8ae718da..ea7353c270 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -60,351 +60,352 @@ public class BrokerFilter implements Broker { this.next = next; } + public Broker getNext() { + return next; + } + @Override - public Broker getAdaptor(Class type) { - if (type.isInstance(this)) { - return this; - } - return next.getAdaptor(type); + public Broker getAdaptor(Class type) { + return type.isInstance(this) ? this : getNext().getAdaptor(type); } @Override public Map getDestinationMap() { - return next.getDestinationMap(); + return getNext().getDestinationMap(); } @Override public Map getDestinationMap(ActiveMQDestination destination) { - return next.getDestinationMap(destination); + return getNext().getDestinationMap(destination); } @Override - public Set getDestinations(ActiveMQDestination destination) { - return next.getDestinations(destination); + public Set getDestinations(ActiveMQDestination destination) { + return getNext().getDestinations(destination); } @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { - next.acknowledge(consumerExchange, ack); + getNext().acknowledge(consumerExchange, ack); } @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { - return next.messagePull(context, pull); + return getNext().messagePull(context, pull); } @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - next.addConnection(context, info); + getNext().addConnection(context, info); } @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - return next.addConsumer(context, info); + return getNext().addConsumer(context, info); } @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - next.addProducer(context, info); + getNext().addProducer(context, info); } @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { - next.commitTransaction(context, xid, onePhase); + getNext().commitTransaction(context, xid, onePhase); } @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { - next.removeSubscription(context, info); + getNext().removeSubscription(context, info); } @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { - return next.getPreparedTransactions(context); + return getNext().getPreparedTransactions(context); } @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { - return next.prepareTransaction(context, xid); + return getNext().prepareTransaction(context, xid); } @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - next.removeConnection(context, info, error); + getNext().removeConnection(context, info, error); } @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - next.removeConsumer(context, info); + getNext().removeConsumer(context, info); } @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { - next.removeProducer(context, info); + getNext().removeProducer(context, info); } @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { - next.rollbackTransaction(context, xid); + getNext().rollbackTransaction(context, xid); } @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - next.send(producerExchange, messageSend); + getNext().send(producerExchange, messageSend); } @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { - next.beginTransaction(context, xid); + getNext().beginTransaction(context, xid); } @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { - next.forgetTransaction(context, transactionId); + getNext().forgetTransaction(context, transactionId); } @Override public Connection[] getClients() throws Exception { - return next.getClients(); + return getNext().getClients(); } @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { - return next.addDestination(context, destination,createIfTemporary); + return getNext().addDestination(context, destination,createIfTemporary); } @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { - next.removeDestination(context, destination, timeout); + getNext().removeDestination(context, destination, timeout); } @Override public ActiveMQDestination[] getDestinations() throws Exception { - return next.getDestinations(); + return getNext().getDestinations(); } @Override public void start() throws Exception { - next.start(); + getNext().start(); } @Override public void stop() throws Exception { - next.stop(); + getNext().stop(); } @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { - next.addSession(context, info); + getNext().addSession(context, info); } @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { - next.removeSession(context, info); + getNext().removeSession(context, info); } @Override public BrokerId getBrokerId() { - return next.getBrokerId(); + return getNext().getBrokerId(); } @Override public String getBrokerName() { - return next.getBrokerName(); + return getNext().getBrokerName(); } @Override public void gc() { - next.gc(); + getNext().gc(); } @Override public void addBroker(Connection connection, BrokerInfo info) { - next.addBroker(connection, info); + getNext().addBroker(connection, info); } @Override public void removeBroker(Connection connection, BrokerInfo info) { - next.removeBroker(connection, info); + getNext().removeBroker(connection, info); } @Override public BrokerInfo[] getPeerBrokerInfos() { - return next.getPeerBrokerInfos(); + return getNext().getPeerBrokerInfos(); } @Override public void preProcessDispatch(MessageDispatch messageDispatch) { - next.preProcessDispatch(messageDispatch); + getNext().preProcessDispatch(messageDispatch); } @Override public void postProcessDispatch(MessageDispatch messageDispatch) { - next.postProcessDispatch(messageDispatch); + getNext().postProcessDispatch(messageDispatch); } @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { - next.processDispatchNotification(messageDispatchNotification); + getNext().processDispatchNotification(messageDispatchNotification); } @Override public boolean isStopped() { - return next.isStopped(); + return getNext().isStopped(); } @Override public Set getDurableDestinations() { - return next.getDurableDestinations(); + return getNext().getDurableDestinations(); } @Override public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { - next.addDestinationInfo(context, info); + getNext().addDestinationInfo(context, info); } @Override public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { - next.removeDestinationInfo(context, info); + getNext().removeDestinationInfo(context, info); } @Override public boolean isFaultTolerantConfiguration() { - return next.isFaultTolerantConfiguration(); + return getNext().isFaultTolerantConfiguration(); } @Override public ConnectionContext getAdminConnectionContext() { - return next.getAdminConnectionContext(); + return getNext().getAdminConnectionContext(); } @Override public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { - next.setAdminConnectionContext(adminConnectionContext); + getNext().setAdminConnectionContext(adminConnectionContext); } @Override public PListStore getTempDataStore() { - return next.getTempDataStore(); + return getNext().getTempDataStore(); } @Override public URI getVmConnectorURI() { - return next.getVmConnectorURI(); + return getNext().getVmConnectorURI(); } @Override public void brokerServiceStarted() { - next.brokerServiceStarted(); + getNext().brokerServiceStarted(); } @Override public BrokerService getBrokerService() { - return next.getBrokerService(); + return getNext().getBrokerService(); } @Override public boolean isExpired(MessageReference messageReference) { - return next.isExpired(messageReference); + return getNext().isExpired(messageReference); } @Override public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { - next.messageExpired(context, message, subscription); + getNext().messageExpired(context, message, subscription); } @Override public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) { - return next.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); + return getNext().sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); } @Override public Broker getRoot() { - return next.getRoot(); + return getNext().getRoot(); } @Override public long getBrokerSequenceId() { - return next.getBrokerSequenceId(); + return getNext().getBrokerSequenceId(); } @Override public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { - next.fastProducer(context, producerInfo, destination); + getNext().fastProducer(context, producerInfo, destination); } @Override - public void isFull(ConnectionContext context,Destination destination, Usage usage) { - next.isFull(context,destination, usage); + public void isFull(ConnectionContext context,Destination destination, Usage usage) { + getNext().isFull(context,destination, usage); } @Override public void messageConsumed(ConnectionContext context,MessageReference messageReference) { - next.messageConsumed(context, messageReference); + getNext().messageConsumed(context, messageReference); } @Override public void messageDelivered(ConnectionContext context,MessageReference messageReference) { - next.messageDelivered(context, messageReference); + getNext().messageDelivered(context, messageReference); } @Override public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) { - next.messageDiscarded(context, sub, messageReference); + getNext().messageDiscarded(context, sub, messageReference); } @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { - next.slowConsumer(context, destination,subs); + getNext().slowConsumer(context, destination,subs); } @Override public void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination) { - next.virtualDestinationAdded(context, virtualDestination); + getNext().virtualDestinationAdded(context, virtualDestination); } @Override public void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination) { - next.virtualDestinationRemoved(context, virtualDestination); + getNext().virtualDestinationRemoved(context, virtualDestination); } @Override public void nowMasterBroker() { - next.nowMasterBroker(); + getNext().nowMasterBroker(); } @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { - next.processConsumerControl(consumerExchange, control); + getNext().processConsumerControl(consumerExchange, control); } @Override public void reapplyInterceptor() { - next.reapplyInterceptor(); + getNext().reapplyInterceptor(); } @Override public Scheduler getScheduler() { - return next.getScheduler(); + return getNext().getScheduler(); } @Override public ThreadPoolExecutor getExecutor() { - return next.getExecutor(); + return getNext().getExecutor(); } @Override public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { - next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); + getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); } @Override public void networkBridgeStopped(BrokerInfo brokerInfo) { - next.networkBridgeStopped(brokerInfo); + getNext().networkBridgeStopped(brokerInfo); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index c4059a08f5..e8350c1c67 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -65,112 +65,92 @@ public class EmptyBroker implements Broker { } @Override - public Broker getAdaptor(Class type) { - if (type.isInstance(this)) { - return this; - } - return null; + public Broker getAdaptor(Class type) { + return type.isInstance(this) ? this : null; } @Override - @SuppressWarnings("unchecked") public Map getDestinationMap() { - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } @Override public Map getDestinationMap(ActiveMQDestination destination) { - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } @Override - public Set getDestinations(ActiveMQDestination destination) { - return Collections.EMPTY_SET; + public Set getDestinations(ActiveMQDestination destination) { + return Collections.emptySet(); } @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - } @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - } @Override public void addSession(ConnectionContext context, SessionInfo info) throws Exception { - } @Override public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { - } @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - } @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { - } @Override public Connection[] getClients() throws Exception { - return null; } @Override public ActiveMQDestination[] getDestinations() throws Exception { - return null; } @Override public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { - return null; } @Override public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { - } @Override public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { - return 0; } @Override public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { - } @Override public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { - } @Override public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { - } @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean flag) throws Exception { - return null; } @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { - } @Override @@ -180,47 +160,38 @@ public class EmptyBroker implements Broker { @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - } @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { - } @Override public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { - } @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { - } @Override public void gc() { - } @Override public void start() throws Exception { - } @Override public void stop() throws Exception { - } @Override public void addBroker(Connection connection, BrokerInfo info) { - } @Override public void removeBroker(Connection connection, BrokerInfo info) { - } @Override @@ -238,7 +209,6 @@ public class EmptyBroker implements Broker { @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { - } @Override @@ -327,7 +297,7 @@ public class EmptyBroker implements Broker { } @Override - public void isFull(ConnectionContext context, Destination destination,Usage usage) { + public void isFull(ConnectionContext context, Destination destination,Usage usage) { } @Override @@ -347,13 +317,11 @@ public class EmptyBroker implements Broker { } @Override - public void virtualDestinationAdded(ConnectionContext context, - VirtualDestination virtualDestination) { + public void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination) { } @Override - public void virtualDestinationRemoved(ConnectionContext context, - VirtualDestination virtualDestination) { + public void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination) { } @Override @@ -369,13 +337,11 @@ public class EmptyBroker implements Broker { } @Override - public void processConsumerControl(ConsumerBrokerExchange consumerExchange, - ConsumerControl control) { + public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { } @Override public void reapplyInterceptor() { - } @Override @@ -387,4 +353,5 @@ public class EmptyBroker implements Broker { public ThreadPoolExecutor getExecutor() { return null; } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 35501e3150..f9271d7307 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -62,27 +62,23 @@ public class ErrorBroker implements Broker { } @Override - @SuppressWarnings("unchecked") public Map getDestinationMap() { - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } @Override public Map getDestinationMap(ActiveMQDestination destination) { - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } @Override - public Set getDestinations(ActiveMQDestination destination) { - return Collections.EMPTY_SET; + public Set getDestinations(ActiveMQDestination destination) { + return Collections.emptySet(); } @Override - public Broker getAdaptor(Class type) { - if (type.isInstance(this)) { - return this; - } - return null; + public Broker getAdaptor(Class type) { + return type.isInstance(this) ? this : null; } @Override @@ -338,7 +334,7 @@ public class ErrorBroker implements Broker { } @Override - public void isFull(ConnectionContext context,Destination destination, Usage usage) { + public void isFull(ConnectionContext context,Destination destination, Usage usage) { throw new BrokerStoppedException(this.message); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 6306325fec..e7ed22f674 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -16,37 +16,8 @@ */ package org.apache.activemq.broker; -import java.net.URI; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.virtual.VirtualDestination; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.BrokerId; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerControl; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveSubscriptionInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.store.PListStore; -import org.apache.activemq.thread.Scheduler; -import org.apache.activemq.usage.Usage; - /** * Like a BrokerFilter but it allows you to switch the getNext().broker. This * has more overhead than a BrokerFilter since access to the getNext().broker @@ -54,16 +25,17 @@ import org.apache.activemq.usage.Usage; * * */ -public class MutableBrokerFilter implements Broker { +public class MutableBrokerFilter extends BrokerFilter { protected AtomicReference next = new AtomicReference(); public MutableBrokerFilter(Broker next) { + super(null); // prevent future code from using the inherited 'next' this.next.set(next); } @Override - public Broker getAdaptor(Class type) { + public Broker getAdaptor(Class type) { if (type.isInstance(this)) { return this; } @@ -78,344 +50,4 @@ public class MutableBrokerFilter implements Broker { this.next.set(next); } - @Override - public Map getDestinationMap() { - return getNext().getDestinationMap(); - } - - @Override - public Map getDestinationMap(ActiveMQDestination destination) { - return getNext().getDestinationMap(destination); - } - - @Override - public Set getDestinations(ActiveMQDestination destination) { - return getNext().getDestinations(destination); - } - - @Override - public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { - getNext().acknowledge(consumerExchange, ack); - } - - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - getNext().addConnection(context, info); - } - - @Override - public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - return getNext().addConsumer(context, info); - } - - @Override - public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - getNext().addProducer(context, info); - } - - @Override - public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { - getNext().commitTransaction(context, xid, onePhase); - } - - @Override - public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { - getNext().removeSubscription(context, info); - } - - @Override - public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { - return getNext().getPreparedTransactions(context); - } - - @Override - public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { - return getNext().prepareTransaction(context, xid); - } - - @Override - public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - getNext().removeConnection(context, info, error); - } - - @Override - public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - getNext().removeConsumer(context, info); - } - - @Override - public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { - getNext().removeProducer(context, info); - } - - @Override - public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { - getNext().rollbackTransaction(context, xid); - } - - @Override - public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - getNext().send(producerExchange, messageSend); - } - - @Override - public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { - getNext().beginTransaction(context, xid); - } - - @Override - public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { - getNext().forgetTransaction(context, transactionId); - } - - @Override - public Connection[] getClients() throws Exception { - return getNext().getClients(); - } - - @Override - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { - return getNext().addDestination(context, destination,createIfTemporary); - } - - @Override - public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { - getNext().removeDestination(context, destination, timeout); - } - - @Override - public ActiveMQDestination[] getDestinations() throws Exception { - return getNext().getDestinations(); - } - - @Override - public void start() throws Exception { - getNext().start(); - } - - @Override - public void stop() throws Exception { - getNext().stop(); - } - - @Override - public void addSession(ConnectionContext context, SessionInfo info) throws Exception { - getNext().addSession(context, info); - } - - @Override - public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { - getNext().removeSession(context, info); - } - - @Override - public BrokerId getBrokerId() { - return getNext().getBrokerId(); - } - - @Override - public String getBrokerName() { - return getNext().getBrokerName(); - } - - @Override - public void gc() { - getNext().gc(); - } - - @Override - public void addBroker(Connection connection, BrokerInfo info) { - getNext().addBroker(connection, info); - } - - @Override - public void removeBroker(Connection connection, BrokerInfo info) { - getNext().removeBroker(connection, info); - } - - @Override - public BrokerInfo[] getPeerBrokerInfos() { - return getNext().getPeerBrokerInfos(); - } - - @Override - public void preProcessDispatch(MessageDispatch messageDispatch) { - getNext().preProcessDispatch(messageDispatch); - } - - @Override - public void postProcessDispatch(MessageDispatch messageDispatch) { - getNext().postProcessDispatch(messageDispatch); - } - - @Override - public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { - getNext().processDispatchNotification(messageDispatchNotification); - } - - @Override - public boolean isStopped() { - return getNext().isStopped(); - } - - @Override - public Set getDurableDestinations() { - return getNext().getDurableDestinations(); - } - - @Override - public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { - getNext().addDestinationInfo(context, info); - - } - - @Override - public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { - getNext().removeDestinationInfo(context, info); - - } - - @Override - public boolean isFaultTolerantConfiguration() { - return getNext().isFaultTolerantConfiguration(); - } - - @Override - public ConnectionContext getAdminConnectionContext() { - return getNext().getAdminConnectionContext(); - } - - @Override - public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { - getNext().setAdminConnectionContext(adminConnectionContext); - } - - @Override - public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { - return getNext().messagePull(context, pull); - } - - @Override - public PListStore getTempDataStore() { - return getNext().getTempDataStore(); - } - - @Override - public URI getVmConnectorURI() { - return getNext().getVmConnectorURI(); - } - - @Override - public void brokerServiceStarted() { - getNext().brokerServiceStarted(); - } - - @Override - public BrokerService getBrokerService() { - return getNext().getBrokerService(); - } - - @Override - public boolean isExpired(MessageReference messageReference) { - return getNext().isExpired(messageReference); - } - - @Override - public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { - getNext().messageExpired(context, message, subscription); - } - - @Override - public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, - Subscription subscription, Throwable poisonCause) { - return getNext().sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); - } - - @Override - public Broker getRoot() { - return getNext().getRoot(); - } - - @Override - public long getBrokerSequenceId() { - return getNext().getBrokerSequenceId(); - } - - @Override - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { - getNext().fastProducer(context, producerInfo, destination); - } - - @Override - public void isFull(ConnectionContext context,Destination destination, Usage usage) { - getNext().isFull(context,destination, usage); - } - - @Override - public void messageConsumed(ConnectionContext context,MessageReference messageReference) { - getNext().messageConsumed(context, messageReference); - } - - @Override - public void messageDelivered(ConnectionContext context,MessageReference messageReference) { - getNext().messageDelivered(context, messageReference); - } - - @Override - public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { - getNext().messageDiscarded(context, sub, messageReference); - } - - @Override - public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) { - getNext().slowConsumer(context, dest,subs); - } - - @Override - public void virtualDestinationAdded(ConnectionContext context, - VirtualDestination virtualDestination) { - getNext().virtualDestinationAdded(context, virtualDestination); - } - - @Override - public void virtualDestinationRemoved(ConnectionContext context, - VirtualDestination virtualDestination) { - getNext().virtualDestinationRemoved(context, virtualDestination); - } - - @Override - public void nowMasterBroker() { - getNext().nowMasterBroker(); - } - - @Override - public void processConsumerControl(ConsumerBrokerExchange consumerExchange, - ConsumerControl control) { - getNext().processConsumerControl(consumerExchange, control); - } - - @Override - public void reapplyInterceptor() { - getNext().reapplyInterceptor(); - } - - @Override - public Scheduler getScheduler() { - return getNext().getScheduler(); - } - - @Override - public ThreadPoolExecutor getExecutor() { - return getNext().getExecutor(); - } - - @Override - public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { - getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); - } - - @Override - public void networkBridgeStopped(BrokerInfo brokerInfo) { - getNext().networkBridgeStopped(brokerInfo); - } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java index d9d7c9d6c7..726a17978f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java @@ -146,7 +146,7 @@ public interface Region extends Service { * * @return a set of matching destination objects. */ - Set getDestinations(ActiveMQDestination destination); + Set getDestinations(ActiveMQDestination destination); void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 036eed3e31..98475501eb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -161,15 +161,6 @@ public class RegionBroker extends EmptyBroker { } } - @Override - @SuppressWarnings("rawtypes") - public Broker getAdaptor(Class type) { - if (type.isInstance(this)) { - return this; - } - return null; - } - public Region getQueueRegion() { return queueRegion; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index a0b17bd475..4f93bcd434 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -515,7 +515,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void isFull(ConnectionContext context, Destination destination, Usage usage) { + public void isFull(ConnectionContext context, Destination destination, Usage usage) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { LOG.info("Destination is full: {}", destination.getName()); } diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerInterceptorsTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerInterceptorsTest.java new file mode 100644 index 0000000000..99334892cc --- /dev/null +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerInterceptorsTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.Assert; + + +public class BrokerInterceptorsTest { + + private BrokerService brokerService; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setAdvisorySupport(true); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.start(); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testNavigateInterceptors() throws Exception { + Broker b = brokerService.getBroker(); + Assert.assertTrue(b instanceof BrokerFilter); + + BrokerFilter bf = (BrokerFilter) b; + int count = 0; + while (bf != null) { + Broker next = bf.getNext(); + bf = next instanceof BrokerFilter ? (BrokerFilter) next : null; + count++; + } + // a few Broker interceptors are created because of the config (i.e. AdvisoryBroker) + Assert.assertTrue(count > 1); + } + +} diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java index 1e53f60460..df5399e9be 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.broker; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; @@ -34,6 +31,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + public class LinkStealingTest { private static final Logger LOG = LoggerFactory.getLogger(LinkStealingTest.class); @@ -44,7 +45,6 @@ public class LinkStealingTest { private String stealableConnectionURI; private String unstealableConnectionURI; - @SuppressWarnings("unchecked") @Before public void setUp() throws Exception { brokerService = new BrokerService(); diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java index 4d5bbb2d41..ee7890afde 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java @@ -162,7 +162,7 @@ public class CamelRoutesBroker extends BrokerFilter { } @Override - public void isFull(ConnectionContext context, Destination destination, Usage usage) { + public void isFull(ConnectionContext context, Destination destination, Usage usage) { blockWhileLoadingCamelRoutes(); super.isFull(context, destination, usage); }