From 03785a31428ef444d07231c51b8bb83797b30f2c Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Fri, 11 Aug 2017 08:13:49 -0400 Subject: [PATCH] ARTEMIS-1347 - Pass the QueueBinding to beforeCreateConsumer Instead of just the queue name the entire QueueBinding object is now passed to the beforeCreateConsumer plugin method. Also cleaned up some comments --- .../core/server/impl/ServerSessionImpl.java | 2 +- .../server/plugin/ActiveMQServerPlugin.java | 58 ++++++++++++++++++- .../plugin/MethodCalledVerifier.java | 5 +- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 9dc7c2e08b..ed1a1710d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -455,7 +455,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { Filter filter = FilterImpl.createFilter(filterString); - server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName, + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding, filterString, browseOnly, supportLargeMessage) : null); ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index b7dcdc3eb0..fddec48ece 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.MessageReference; @@ -47,6 +48,7 @@ public interface ActiveMQServerPlugin { * A connection has been created. * * @param connection The newly created connection + * @throws ActiveMQException */ default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { @@ -56,6 +58,7 @@ public interface ActiveMQServerPlugin { * A connection has been destroyed. * * @param connection + * @throws ActiveMQException */ default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { @@ -77,6 +80,7 @@ public interface ActiveMQServerPlugin { * @param autoCreateQueues * @param context * @param prefixes + * @throws ActiveMQException */ default void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, @@ -89,6 +93,7 @@ public interface ActiveMQServerPlugin { * After a session has been created. * * @param session The newly created session + * @throws ActiveMQException */ default void afterCreateSession(ServerSession session) throws ActiveMQException { @@ -99,6 +104,7 @@ public interface ActiveMQServerPlugin { * * @param session * @param failed + * @throws ActiveMQException */ default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException { @@ -109,6 +115,7 @@ public interface ActiveMQServerPlugin { * * @param session * @param failed + * @throws ActiveMQException */ default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException { @@ -120,6 +127,7 @@ public interface ActiveMQServerPlugin { * @param session * @param key * @param data + * @throws ActiveMQException */ default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { @@ -131,6 +139,7 @@ public interface ActiveMQServerPlugin { * @param session * @param key * @param data + * @throws ActiveMQException */ default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { @@ -144,16 +153,39 @@ public interface ActiveMQServerPlugin { * @param filterString * @param browseOnly * @param supportLargeMessage + * @throws ActiveMQException + * + * @deprecated use {@link #beforeCreateConsumer(long, QueueBinding, SimpleString, boolean, boolean) */ + @Deprecated default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { } + + /** + * + * Before a consumer is created + * + * @param consumerID + * @param QueueBinding + * @param filterString + * @param browseOnly + * @param supportLargeMessage + * @throws ActiveMQException + */ + default void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.beforeCreateConsumer(consumerID, queueBinding.getQueue().getName(), filterString, browseOnly, supportLargeMessage); + } + /** * After a consumer has been created * * @param consumer the created consumer + * @throws ActiveMQException */ default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { @@ -164,6 +196,7 @@ public interface ActiveMQServerPlugin { * * @param consumer * @param failed + * @throws ActiveMQException */ default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { @@ -174,6 +207,7 @@ public interface ActiveMQServerPlugin { * * @param consumer * @param failed + * @throws ActiveMQException */ default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { @@ -183,6 +217,7 @@ public interface ActiveMQServerPlugin { * Before a queue is created * * @param queueConfig + * @throws ActiveMQException */ default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException { @@ -192,6 +227,7 @@ public interface ActiveMQServerPlugin { * After a queue has been created * * @param queue The newly created queue + * @throws ActiveMQException */ default void afterCreateQueue(Queue queue) throws ActiveMQException { @@ -205,6 +241,7 @@ public interface ActiveMQServerPlugin { * @param checkConsumerCount * @param removeConsumers * @param autoDeleteAddress + * @throws ActiveMQException */ default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { @@ -220,6 +257,7 @@ public interface ActiveMQServerPlugin { * @param checkConsumerCount * @param removeConsumers * @param autoDeleteAddress + * @throws ActiveMQException */ default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { @@ -234,6 +272,7 @@ public interface ActiveMQServerPlugin { * @param message * @param direct * @param noAutoCreateQueue + * @throws ActiveMQException */ default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { //by default call the old method for backwards compatibility @@ -249,6 +288,7 @@ public interface ActiveMQServerPlugin { * @param direct * @param noAutoCreateQueue * @param result + * @throws ActiveMQException */ default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, RoutingStatus result) throws ActiveMQException { @@ -264,8 +304,9 @@ public interface ActiveMQServerPlugin { * @param message * @param direct * @param noAutoCreateQueue + * @throws ActiveMQException * - * @deprecated use throws ActiveMQException {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} + * @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} */ @Deprecated default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { @@ -280,8 +321,9 @@ public interface ActiveMQServerPlugin { * @param direct * @param noAutoCreateQueue * @param result + * @throws ActiveMQException * - * @deprecated use throws ActiveMQException {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} + * @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} */ @Deprecated default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, @@ -296,6 +338,7 @@ public interface ActiveMQServerPlugin { * @param context * @param direct * @param rejectDuplicates + * @throws ActiveMQException */ default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException { @@ -309,6 +352,7 @@ public interface ActiveMQServerPlugin { * @param direct * @param rejectDuplicates * @param result + * @throws ActiveMQException */ default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, RoutingStatus result) throws ActiveMQException { @@ -320,6 +364,7 @@ public interface ActiveMQServerPlugin { * * @param consumer the consumer the message will be delivered to * @param reference message reference + * @throws ActiveMQException */ default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { //by default call the old method for backwards compatibility @@ -331,6 +376,7 @@ public interface ActiveMQServerPlugin { * * @param consumer the consumer the message was delivered to * @param reference message reference + * @throws ActiveMQException */ default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { //by default call the old method for backwards compatibility @@ -341,6 +387,7 @@ public interface ActiveMQServerPlugin { * Before a message is delivered to a client consumer * * @param reference + * @throws ActiveMQException * * @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)} */ @@ -353,8 +400,9 @@ public interface ActiveMQServerPlugin { * After a message is delivered to a client consumer * * @param reference + * @throws ActiveMQException * - * @deprecated use throws ActiveMQException {@link #afterDeliver(ServerConsumer, MessageReference)} + * @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)} */ @Deprecated default void afterDeliver(MessageReference reference) throws ActiveMQException { @@ -366,6 +414,7 @@ public interface ActiveMQServerPlugin { * * @param message The expired message * @param messageExpiryAddress The message expiry address if exists + * @throws ActiveMQException */ default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { @@ -376,6 +425,7 @@ public interface ActiveMQServerPlugin { * * @param ref The acked message * @param reason The ack reason + * @throws ActiveMQException */ default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { @@ -385,6 +435,7 @@ public interface ActiveMQServerPlugin { * Before a bridge is deployed * * @param config The bridge configuration + * @throws ActiveMQException */ default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException { @@ -394,6 +445,7 @@ public interface ActiveMQServerPlugin { * After a bridge has been deployed * * @param bridge The newly deployed bridge + * @throws ActiveMQException */ default void afterDeployBridge(Bridge bridge) throws ActiveMQException { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index c42dd11a04..dbb659354f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -18,6 +18,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.MessageReference; @@ -142,9 +143,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { } @Override - public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, + public void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString, boolean browseOnly, boolean supportLargeMessage) { - Preconditions.checkNotNull(queueName); + Preconditions.checkNotNull(queueBinding); methodCalled(BEFORE_CREATE_CONSUMER); }