ARTEMIS-1347 - Pass the QueueBinding to beforeCreateConsumer

Instead of just the queue name the entire QueueBinding object is now
passed to the beforeCreateConsumer plugin method. Also cleaned up some
comments
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-08-11 08:13:49 -04:00 committed by Justin Bertram
parent 10da406a12
commit 03785a3142
3 changed files with 59 additions and 6 deletions

View File

@ -455,7 +455,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Filter filter = FilterImpl.createFilter(filterString);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName,
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding,
filterString, browseOnly, supportLargeMessage) : null);
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -47,6 +48,7 @@ public interface ActiveMQServerPlugin {
* A connection has been created.
*
* @param connection The newly created connection
* @throws ActiveMQException
*/
default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
@ -56,6 +58,7 @@ public interface ActiveMQServerPlugin {
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
@ -77,6 +80,7 @@ public interface ActiveMQServerPlugin {
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
default void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
@ -89,6 +93,7 @@ public interface ActiveMQServerPlugin {
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
default void afterCreateSession(ServerSession session) throws ActiveMQException {
@ -99,6 +104,7 @@ public interface ActiveMQServerPlugin {
*
* @param session
* @param failed
* @throws ActiveMQException
*/
default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
@ -109,6 +115,7 @@ public interface ActiveMQServerPlugin {
*
* @param session
* @param failed
* @throws ActiveMQException
*/
default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
@ -120,6 +127,7 @@ public interface ActiveMQServerPlugin {
* @param session
* @param key
* @param data
* @throws ActiveMQException
*/
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
@ -131,6 +139,7 @@ public interface ActiveMQServerPlugin {
* @param session
* @param key
* @param data
* @throws ActiveMQException
*/
default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
@ -144,16 +153,39 @@ public interface ActiveMQServerPlugin {
* @param filterString
* @param browseOnly
* @param supportLargeMessage
* @throws ActiveMQException
*
* @deprecated use {@link #beforeCreateConsumer(long, QueueBinding, SimpleString, boolean, boolean)
*/
@Deprecated
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
}
/**
*
* Before a consumer is created
*
* @param consumerID
* @param QueueBinding
* @param filterString
* @param browseOnly
* @param supportLargeMessage
* @throws ActiveMQException
*/
default void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
//by default call the old method for backwards compatibility
this.beforeCreateConsumer(consumerID, queueBinding.getQueue().getName(), filterString, browseOnly, supportLargeMessage);
}
/**
* After a consumer has been created
*
* @param consumer the created consumer
* @throws ActiveMQException
*/
default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
@ -164,6 +196,7 @@ public interface ActiveMQServerPlugin {
*
* @param consumer
* @param failed
* @throws ActiveMQException
*/
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
@ -174,6 +207,7 @@ public interface ActiveMQServerPlugin {
*
* @param consumer
* @param failed
* @throws ActiveMQException
*/
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
@ -183,6 +217,7 @@ public interface ActiveMQServerPlugin {
* Before a queue is created
*
* @param queueConfig
* @throws ActiveMQException
*/
default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException {
@ -192,6 +227,7 @@ public interface ActiveMQServerPlugin {
* After a queue has been created
*
* @param queue The newly created queue
* @throws ActiveMQException
*/
default void afterCreateQueue(Queue queue) throws ActiveMQException {
@ -205,6 +241,7 @@ public interface ActiveMQServerPlugin {
* @param checkConsumerCount
* @param removeConsumers
* @param autoDeleteAddress
* @throws ActiveMQException
*/
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
@ -220,6 +257,7 @@ public interface ActiveMQServerPlugin {
* @param checkConsumerCount
* @param removeConsumers
* @param autoDeleteAddress
* @throws ActiveMQException
*/
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
@ -234,6 +272,7 @@ public interface ActiveMQServerPlugin {
* @param message
* @param direct
* @param noAutoCreateQueue
* @throws ActiveMQException
*/
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
//by default call the old method for backwards compatibility
@ -249,6 +288,7 @@ public interface ActiveMQServerPlugin {
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
@ -264,8 +304,9 @@ public interface ActiveMQServerPlugin {
* @param message
* @param direct
* @param noAutoCreateQueue
* @throws ActiveMQException
*
* @deprecated use throws ActiveMQException {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
*/
@Deprecated
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
@ -280,8 +321,9 @@ public interface ActiveMQServerPlugin {
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*
* @deprecated use throws ActiveMQException {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
*/
@Deprecated
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
@ -296,6 +338,7 @@ public interface ActiveMQServerPlugin {
* @param context
* @param direct
* @param rejectDuplicates
* @throws ActiveMQException
*/
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
@ -309,6 +352,7 @@ public interface ActiveMQServerPlugin {
* @param direct
* @param rejectDuplicates
* @param result
* @throws ActiveMQException
*/
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) throws ActiveMQException {
@ -320,6 +364,7 @@ public interface ActiveMQServerPlugin {
*
* @param consumer the consumer the message will be delivered to
* @param reference message reference
* @throws ActiveMQException
*/
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
//by default call the old method for backwards compatibility
@ -331,6 +376,7 @@ public interface ActiveMQServerPlugin {
*
* @param consumer the consumer the message was delivered to
* @param reference message reference
* @throws ActiveMQException
*/
default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
//by default call the old method for backwards compatibility
@ -341,6 +387,7 @@ public interface ActiveMQServerPlugin {
* Before a message is delivered to a client consumer
*
* @param reference
* @throws ActiveMQException
*
* @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)}
*/
@ -353,8 +400,9 @@ public interface ActiveMQServerPlugin {
* After a message is delivered to a client consumer
*
* @param reference
* @throws ActiveMQException
*
* @deprecated use throws ActiveMQException {@link #afterDeliver(ServerConsumer, MessageReference)}
* @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)}
*/
@Deprecated
default void afterDeliver(MessageReference reference) throws ActiveMQException {
@ -366,6 +414,7 @@ public interface ActiveMQServerPlugin {
*
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
* @throws ActiveMQException
*/
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
@ -376,6 +425,7 @@ public interface ActiveMQServerPlugin {
*
* @param ref The acked message
* @param reason The ack reason
* @throws ActiveMQException
*/
default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
@ -385,6 +435,7 @@ public interface ActiveMQServerPlugin {
* Before a bridge is deployed
*
* @param config The bridge configuration
* @throws ActiveMQException
*/
default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException {
@ -394,6 +445,7 @@ public interface ActiveMQServerPlugin {
* After a bridge has been deployed
*
* @param bridge The newly deployed bridge
* @throws ActiveMQException
*/
default void afterDeployBridge(Bridge bridge) throws ActiveMQException {

View File

@ -18,6 +18,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -142,9 +143,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
}
@Override
public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
public void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) {
Preconditions.checkNotNull(queueName);
Preconditions.checkNotNull(queueBinding);
methodCalled(BEFORE_CREATE_CONSUMER);
}