diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index a213e6d024..caccee3c75 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -514,8 +514,13 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif } ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); - server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null); + try { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null); + } catch (ActiveMQException t) { + logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } if (logger.isTraceEnabled()) { logger.trace("Connection created " + connection); } @@ -537,7 +542,13 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif if (conn != null && !conn.connection.isSupportReconnect()) { RemotingConnection removedConnection = removeConnection(connectionID); if (removedConnection != null) { - server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null); + try { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null); + } catch (ActiveMQException t) { + logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t); + conn.connection.fail(t); + return; + } } conn.connection.fail(new ActiveMQRemoteDisconnectException()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 78ce8a2a5c..a3b93ea692 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -198,7 +199,7 @@ public interface ActiveMQServer extends ServiceComponent { List getBrokerPlugins(); - void callBrokerPlugins(ActiveMQPluginRunnable pluginRun); + void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; boolean hasBrokerPlugins(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 7980e6e98a..700b82757d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.server; -import javax.json.JsonArrayBuilder; -import javax.transaction.xa.Xid; import java.util.List; import java.util.Set; +import javax.json.JsonArrayBuilder; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -208,9 +209,9 @@ public interface ServerSession extends SecurityAuth { Set getServerConsumers(); - void addMetaData(String key, String data); + void addMetaData(String key, String data) throws Exception; - boolean addUniqueMetaData(String key, String data); + boolean addUniqueMetaData(String key, String data) throws Exception; String getMetaData(String key); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index fe523077ae..12848bbec3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import javax.management.MBeanServer; import java.io.File; import java.io.IOException; import java.io.PrintWriter; @@ -48,8 +47,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import javax.management.MBeanServer; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -164,11 +166,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.SecurityFormatter; import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.VersionLoader; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; @@ -1843,9 +1845,20 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) { + public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException { if (pluginRun != null) { - getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin)); + for (ActiveMQServerPlugin plugin : getBrokerPlugins()) { + try { + pluginRun.run(plugin); + } catch (Throwable e) { + if (e instanceof ActiveMQException) { + logger.debug("plugin " + plugin + " is throwing ActiveMQException"); + throw (ActiveMQException) e; + } else { + logger.warn("Internal error on plugin " + pluginRun, e.getMessage(), e); + } + } + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 8e557d3b06..f3617c170b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1383,7 +1383,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public void addMetaData(String key, String data) { + public void addMetaData(String key, String data) throws Exception { server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null); if (metaData == null) { metaData = new HashMap<>(); @@ -1393,7 +1393,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public boolean addUniqueMetaData(String key, String data) { + public boolean addUniqueMetaData(String key, String data) throws Exception { ServerSession sessionWithMetaData = server.lookupSession(key, data); if (sessionWithMetaData != null && sessionWithMetaData != this) { // There is a duplication of this property diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java index bc85475cd7..4abe95df01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java @@ -17,8 +17,10 @@ package org.apache.activemq.artemis.core.server.plugin; +import org.apache.activemq.artemis.api.core.ActiveMQException; + public interface ActiveMQPluginRunnable { - void run(ActiveMQServerPlugin plugin); + void run(ActiveMQServerPlugin plugin) throws ActiveMQException; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index 770c4291bb..b1eab6647c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.plugin; import java.util.Map; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -47,7 +48,7 @@ public interface ActiveMQServerPlugin { * * @param connection The newly created connection */ - default void afterCreateConnection(RemotingConnection connection) { + default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { } @@ -56,7 +57,7 @@ public interface ActiveMQServerPlugin { * * @param connection */ - default void afterDestroyConnection(RemotingConnection connection) { + default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { } @@ -80,7 +81,7 @@ public interface ActiveMQServerPlugin { default void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, - Map prefixes) { + Map prefixes) throws ActiveMQException { } @@ -89,7 +90,7 @@ public interface ActiveMQServerPlugin { * * @param session The newly created session */ - default void afterCreateSession(ServerSession session) { + default void afterCreateSession(ServerSession session) throws ActiveMQException { } @@ -99,7 +100,7 @@ public interface ActiveMQServerPlugin { * @param session * @param failed */ - default void beforeCloseSession(ServerSession session, boolean failed) { + default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException { } @@ -109,7 +110,7 @@ public interface ActiveMQServerPlugin { * @param session * @param failed */ - default void afterCloseSession(ServerSession session, boolean failed) { + default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException { } @@ -120,7 +121,7 @@ public interface ActiveMQServerPlugin { * @param key * @param data */ - default void beforeSessionMetadataAdded(ServerSession session, String key, String data) { + default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { } @@ -131,7 +132,7 @@ public interface ActiveMQServerPlugin { * @param key * @param data */ - default void afterSessionMetadataAdded(ServerSession session, String key, String data) { + default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { } @@ -145,7 +146,7 @@ public interface ActiveMQServerPlugin { * @param supportLargeMessage */ default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, - boolean browseOnly, boolean supportLargeMessage) { + boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { } @@ -154,7 +155,7 @@ public interface ActiveMQServerPlugin { * * @param consumer the created consumer */ - default void afterCreateConsumer(ServerConsumer consumer) { + default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { } @@ -164,7 +165,7 @@ public interface ActiveMQServerPlugin { * @param consumer * @param failed */ - default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { + default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { } @@ -174,7 +175,7 @@ public interface ActiveMQServerPlugin { * @param consumer * @param failed */ - default void afterCloseConsumer(ServerConsumer consumer, boolean failed) { + default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { } @@ -183,7 +184,7 @@ public interface ActiveMQServerPlugin { * * @param queueConfig */ - default void beforeCreateQueue(QueueConfig queueConfig) { + default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException { } @@ -192,7 +193,7 @@ public interface ActiveMQServerPlugin { * * @param queue The newly created queue */ - default void afterCreateQueue(Queue queue) { + default void afterCreateQueue(Queue queue) throws ActiveMQException { } @@ -206,7 +207,7 @@ public interface ActiveMQServerPlugin { * @param autoDeleteAddress */ default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, - boolean removeConsumers, boolean autoDeleteAddress) { + boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { } @@ -221,7 +222,7 @@ public interface ActiveMQServerPlugin { * @param autoDeleteAddress */ default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, - boolean removeConsumers, boolean autoDeleteAddress) { + boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { } @@ -234,7 +235,7 @@ public interface ActiveMQServerPlugin { * @param direct * @param noAutoCreateQueue */ - default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { + default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { //by default call the old method for backwards compatibility this.beforeSend(tx, message, direct, noAutoCreateQueue); } @@ -250,7 +251,7 @@ public interface ActiveMQServerPlugin { * @param result */ default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, - RoutingStatus result) { + RoutingStatus result) throws ActiveMQException { //by default call the old method for backwards compatibility this.afterSend(tx, message, direct, noAutoCreateQueue, result); } @@ -264,10 +265,10 @@ public interface ActiveMQServerPlugin { * @param direct * @param noAutoCreateQueue * - * @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} + * @deprecated use throws ActiveMQException {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} */ @Deprecated - default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { + default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { } @@ -280,11 +281,11 @@ public interface ActiveMQServerPlugin { * @param noAutoCreateQueue * @param result * - * @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} + * @deprecated use throws ActiveMQException {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} */ @Deprecated default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, - RoutingStatus result) { + RoutingStatus result) throws ActiveMQException { } @@ -296,7 +297,7 @@ public interface ActiveMQServerPlugin { * @param direct * @param rejectDuplicates */ - default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { + default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException { } @@ -310,7 +311,7 @@ public interface ActiveMQServerPlugin { * @param result */ default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, - RoutingStatus result) { + RoutingStatus result) throws ActiveMQException { } @@ -320,7 +321,7 @@ public interface ActiveMQServerPlugin { * @param consumer the consumer the message will be delivered to * @param reference message reference */ - default void beforeDeliver(ServerConsumer consumer, MessageReference reference) { + default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { //by default call the old method for backwards compatibility this.beforeDeliver(reference); } @@ -331,7 +332,7 @@ public interface ActiveMQServerPlugin { * @param consumer the consumer the message was delivered to * @param reference message reference */ - default void afterDeliver(ServerConsumer consumer, MessageReference reference) { + default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { //by default call the old method for backwards compatibility this.afterDeliver(reference); } @@ -341,10 +342,10 @@ public interface ActiveMQServerPlugin { * * @param reference * - * @deprecated use {@link #beforeDeliver(ServerConsumer, MessageReference)} + * @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)} */ @Deprecated - default void beforeDeliver(MessageReference reference) { + default void beforeDeliver(MessageReference reference) throws ActiveMQException { } @@ -353,10 +354,10 @@ public interface ActiveMQServerPlugin { * * @param reference * - * @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)} + * @deprecated use throws ActiveMQException {@link #afterDeliver(ServerConsumer, MessageReference)} */ @Deprecated - default void afterDeliver(MessageReference reference) { + default void afterDeliver(MessageReference reference) throws ActiveMQException { } @@ -366,7 +367,7 @@ public interface ActiveMQServerPlugin { * @param message The expired message * @param messageExpiryAddress The message expiry address if exists */ - default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { } @@ -376,7 +377,7 @@ public interface ActiveMQServerPlugin { * @param ref The acked message * @param reason The ack reason */ - default void messageAcknowledged(MessageReference ref, AckReason reason) { + default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { } @@ -385,7 +386,7 @@ public interface ActiveMQServerPlugin { * * @param config The bridge configuration */ - default void beforeDeployBridge(BridgeConfiguration config) { + default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException { } @@ -394,7 +395,7 @@ public interface ActiveMQServerPlugin { * * @param bridge The newly deployed bridge */ - default void afterDeployBridge(Bridge bridge) { + default void afterDeployBridge(Bridge bridge) throws ActiveMQException { }