This commit is contained in:
Justin Bertram 2017-08-17 10:14:12 -05:00
commit 9f6ee43fbb
3 changed files with 59 additions and 6 deletions

View File

@ -455,7 +455,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Filter filter = FilterImpl.createFilter(filterString); Filter filter = FilterImpl.createFilter(filterString);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName, server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding,
filterString, browseOnly, supportLargeMessage) : null); filterString, browseOnly, supportLargeMessage) : null);
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -47,6 +48,7 @@ public interface ActiveMQServerPlugin {
* A connection has been created. * A connection has been created.
* *
* @param connection The newly created connection * @param connection The newly created connection
* @throws ActiveMQException
*/ */
default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
@ -56,6 +58,7 @@ public interface ActiveMQServerPlugin {
* A connection has been destroyed. * A connection has been destroyed.
* *
* @param connection * @param connection
* @throws ActiveMQException
*/ */
default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
@ -77,6 +80,7 @@ public interface ActiveMQServerPlugin {
* @param autoCreateQueues * @param autoCreateQueues
* @param context * @param context
* @param prefixes * @param prefixes
* @throws ActiveMQException
*/ */
default void beforeCreateSession(String name, String username, int minLargeMessageSize, default void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
@ -89,6 +93,7 @@ public interface ActiveMQServerPlugin {
* After a session has been created. * After a session has been created.
* *
* @param session The newly created session * @param session The newly created session
* @throws ActiveMQException
*/ */
default void afterCreateSession(ServerSession session) throws ActiveMQException { default void afterCreateSession(ServerSession session) throws ActiveMQException {
@ -99,6 +104,7 @@ public interface ActiveMQServerPlugin {
* *
* @param session * @param session
* @param failed * @param failed
* @throws ActiveMQException
*/ */
default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException { default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
@ -109,6 +115,7 @@ public interface ActiveMQServerPlugin {
* *
* @param session * @param session
* @param failed * @param failed
* @throws ActiveMQException
*/ */
default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException { default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
@ -120,6 +127,7 @@ public interface ActiveMQServerPlugin {
* @param session * @param session
* @param key * @param key
* @param data * @param data
* @throws ActiveMQException
*/ */
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
@ -131,6 +139,7 @@ public interface ActiveMQServerPlugin {
* @param session * @param session
* @param key * @param key
* @param data * @param data
* @throws ActiveMQException
*/ */
default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
@ -144,16 +153,39 @@ public interface ActiveMQServerPlugin {
* @param filterString * @param filterString
* @param browseOnly * @param browseOnly
* @param supportLargeMessage * @param supportLargeMessage
* @throws ActiveMQException
*
* @deprecated use {@link #beforeCreateConsumer(long, QueueBinding, SimpleString, boolean, boolean)
*/ */
@Deprecated
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
} }
/**
*
* Before a consumer is created
*
* @param consumerID
* @param QueueBinding
* @param filterString
* @param browseOnly
* @param supportLargeMessage
* @throws ActiveMQException
*/
default void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
//by default call the old method for backwards compatibility
this.beforeCreateConsumer(consumerID, queueBinding.getQueue().getName(), filterString, browseOnly, supportLargeMessage);
}
/** /**
* After a consumer has been created * After a consumer has been created
* *
* @param consumer the created consumer * @param consumer the created consumer
* @throws ActiveMQException
*/ */
default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
@ -164,6 +196,7 @@ public interface ActiveMQServerPlugin {
* *
* @param consumer * @param consumer
* @param failed * @param failed
* @throws ActiveMQException
*/ */
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
@ -174,6 +207,7 @@ public interface ActiveMQServerPlugin {
* *
* @param consumer * @param consumer
* @param failed * @param failed
* @throws ActiveMQException
*/ */
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
@ -183,6 +217,7 @@ public interface ActiveMQServerPlugin {
* Before a queue is created * Before a queue is created
* *
* @param queueConfig * @param queueConfig
* @throws ActiveMQException
*/ */
default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException { default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException {
@ -192,6 +227,7 @@ public interface ActiveMQServerPlugin {
* After a queue has been created * After a queue has been created
* *
* @param queue The newly created queue * @param queue The newly created queue
* @throws ActiveMQException
*/ */
default void afterCreateQueue(Queue queue) throws ActiveMQException { default void afterCreateQueue(Queue queue) throws ActiveMQException {
@ -205,6 +241,7 @@ public interface ActiveMQServerPlugin {
* @param checkConsumerCount * @param checkConsumerCount
* @param removeConsumers * @param removeConsumers
* @param autoDeleteAddress * @param autoDeleteAddress
* @throws ActiveMQException
*/ */
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
@ -220,6 +257,7 @@ public interface ActiveMQServerPlugin {
* @param checkConsumerCount * @param checkConsumerCount
* @param removeConsumers * @param removeConsumers
* @param autoDeleteAddress * @param autoDeleteAddress
* @throws ActiveMQException
*/ */
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
@ -234,6 +272,7 @@ public interface ActiveMQServerPlugin {
* @param message * @param message
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
* @throws ActiveMQException
*/ */
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
@ -249,6 +288,7 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
* @param result * @param result
* @throws ActiveMQException
*/ */
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException { RoutingStatus result) throws ActiveMQException {
@ -264,8 +304,9 @@ public interface ActiveMQServerPlugin {
* @param message * @param message
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
* @throws ActiveMQException
* *
* @deprecated use throws ActiveMQException {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} * @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
*/ */
@Deprecated @Deprecated
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
@ -280,8 +321,9 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
* @param result * @param result
* @throws ActiveMQException
* *
* @deprecated use throws ActiveMQException {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} * @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
*/ */
@Deprecated @Deprecated
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
@ -296,6 +338,7 @@ public interface ActiveMQServerPlugin {
* @param context * @param context
* @param direct * @param direct
* @param rejectDuplicates * @param rejectDuplicates
* @throws ActiveMQException
*/ */
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException { default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
@ -309,6 +352,7 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param rejectDuplicates * @param rejectDuplicates
* @param result * @param result
* @throws ActiveMQException
*/ */
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) throws ActiveMQException { RoutingStatus result) throws ActiveMQException {
@ -320,6 +364,7 @@ public interface ActiveMQServerPlugin {
* *
* @param consumer the consumer the message will be delivered to * @param consumer the consumer the message will be delivered to
* @param reference message reference * @param reference message reference
* @throws ActiveMQException
*/ */
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
@ -331,6 +376,7 @@ public interface ActiveMQServerPlugin {
* *
* @param consumer the consumer the message was delivered to * @param consumer the consumer the message was delivered to
* @param reference message reference * @param reference message reference
* @throws ActiveMQException
*/ */
default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
@ -341,6 +387,7 @@ public interface ActiveMQServerPlugin {
* Before a message is delivered to a client consumer * Before a message is delivered to a client consumer
* *
* @param reference * @param reference
* @throws ActiveMQException
* *
* @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)} * @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)}
*/ */
@ -353,8 +400,9 @@ public interface ActiveMQServerPlugin {
* After a message is delivered to a client consumer * After a message is delivered to a client consumer
* *
* @param reference * @param reference
* @throws ActiveMQException
* *
* @deprecated use throws ActiveMQException {@link #afterDeliver(ServerConsumer, MessageReference)} * @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)}
*/ */
@Deprecated @Deprecated
default void afterDeliver(MessageReference reference) throws ActiveMQException { default void afterDeliver(MessageReference reference) throws ActiveMQException {
@ -366,6 +414,7 @@ public interface ActiveMQServerPlugin {
* *
* @param message The expired message * @param message The expired message
* @param messageExpiryAddress The message expiry address if exists * @param messageExpiryAddress The message expiry address if exists
* @throws ActiveMQException
*/ */
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
@ -376,6 +425,7 @@ public interface ActiveMQServerPlugin {
* *
* @param ref The acked message * @param ref The acked message
* @param reason The ack reason * @param reason The ack reason
* @throws ActiveMQException
*/ */
default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
@ -385,6 +435,7 @@ public interface ActiveMQServerPlugin {
* Before a bridge is deployed * Before a bridge is deployed
* *
* @param config The bridge configuration * @param config The bridge configuration
* @throws ActiveMQException
*/ */
default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException { default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException {
@ -394,6 +445,7 @@ public interface ActiveMQServerPlugin {
* After a bridge has been deployed * After a bridge has been deployed
* *
* @param bridge The newly deployed bridge * @param bridge The newly deployed bridge
* @throws ActiveMQException
*/ */
default void afterDeployBridge(Bridge bridge) throws ActiveMQException { default void afterDeployBridge(Bridge bridge) throws ActiveMQException {

View File

@ -18,6 +18,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -142,9 +143,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
} }
@Override @Override
public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, public void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) { boolean browseOnly, boolean supportLargeMessage) {
Preconditions.checkNotNull(queueName); Preconditions.checkNotNull(queueBinding);
methodCalled(BEFORE_CREATE_CONSUMER); methodCalled(BEFORE_CREATE_CONSUMER);
} }