diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 0d9195ff20..caccee3c75 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -516,8 +516,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); try { server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null); - } catch (Throwable t) { + } catch (ActiveMQException t) { logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } if (logger.isTraceEnabled()) { logger.trace("Connection created " + connection); @@ -542,8 +544,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif if (removedConnection != null) { try { server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null); - } catch (Throwable t) { + } catch (ActiveMQException t) { logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t); + conn.connection.fail(t); + return; } } conn.connection.fail(new ActiveMQRemoteDisconnectException()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 2594aba9cf..a3b93ea692 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -198,7 +199,7 @@ public interface ActiveMQServer extends ServiceComponent { List getBrokerPlugins(); - void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws Exception; + void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; boolean hasBrokerPlugins(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f34abef71e..12848bbec3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -51,6 +51,7 @@ import javax.management.MBeanServer; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -1844,10 +1845,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws Exception { + public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException { if (pluginRun != null) { for (ActiveMQServerPlugin plugin : getBrokerPlugins()) { - pluginRun.run(plugin); + try { + pluginRun.run(plugin); + } catch (Throwable e) { + if (e instanceof ActiveMQException) { + logger.debug("plugin " + plugin + " is throwing ActiveMQException"); + throw (ActiveMQException) e; + } else { + logger.warn("Internal error on plugin " + pluginRun, e.getMessage(), e); + } + } } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java index b71433565b..4abe95df01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java @@ -17,8 +17,10 @@ package org.apache.activemq.artemis.core.server.plugin; +import org.apache.activemq.artemis.api.core.ActiveMQException; + public interface ActiveMQPluginRunnable { - void run(ActiveMQServerPlugin plugin) throws Exception; + void run(ActiveMQServerPlugin plugin) throws ActiveMQException; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index 60a11d8132..b1eab6647c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.plugin; import java.util.Map; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -47,7 +48,7 @@ public interface ActiveMQServerPlugin { * * @param connection The newly created connection */ - default void afterCreateConnection(RemotingConnection connection) throws Exception { + default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { } @@ -56,7 +57,7 @@ public interface ActiveMQServerPlugin { * * @param connection */ - default void afterDestroyConnection(RemotingConnection connection) throws Exception { + default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { } @@ -80,7 +81,7 @@ public interface ActiveMQServerPlugin { default void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, - Map prefixes) throws Exception { + Map prefixes) throws ActiveMQException { } @@ -89,7 +90,7 @@ public interface ActiveMQServerPlugin { * * @param session The newly created session */ - default void afterCreateSession(ServerSession session) throws Exception { + default void afterCreateSession(ServerSession session) throws ActiveMQException { } @@ -99,7 +100,7 @@ public interface ActiveMQServerPlugin { * @param session * @param failed */ - default void beforeCloseSession(ServerSession session, boolean failed) throws Exception { + default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException { } @@ -109,7 +110,7 @@ public interface ActiveMQServerPlugin { * @param session * @param failed */ - default void afterCloseSession(ServerSession session, boolean failed) throws Exception { + default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException { } @@ -120,7 +121,7 @@ public interface ActiveMQServerPlugin { * @param key * @param data */ - default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws Exception { + default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { } @@ -131,7 +132,7 @@ public interface ActiveMQServerPlugin { * @param key * @param data */ - default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws Exception { + default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException { } @@ -145,7 +146,7 @@ public interface ActiveMQServerPlugin { * @param supportLargeMessage */ default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, - boolean browseOnly, boolean supportLargeMessage) throws Exception { + boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { } @@ -154,7 +155,7 @@ public interface ActiveMQServerPlugin { * * @param consumer the created consumer */ - default void afterCreateConsumer(ServerConsumer consumer) throws Exception { + default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { } @@ -164,7 +165,7 @@ public interface ActiveMQServerPlugin { * @param consumer * @param failed */ - default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws Exception { + default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { } @@ -174,7 +175,7 @@ public interface ActiveMQServerPlugin { * @param consumer * @param failed */ - default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws Exception { + default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { } @@ -183,7 +184,7 @@ public interface ActiveMQServerPlugin { * * @param queueConfig */ - default void beforeCreateQueue(QueueConfig queueConfig) throws Exception { + default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException { } @@ -192,7 +193,7 @@ public interface ActiveMQServerPlugin { * * @param queue The newly created queue */ - default void afterCreateQueue(Queue queue) throws Exception { + default void afterCreateQueue(Queue queue) throws ActiveMQException { } @@ -206,7 +207,7 @@ public interface ActiveMQServerPlugin { * @param autoDeleteAddress */ default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, - boolean removeConsumers, boolean autoDeleteAddress) throws Exception { + boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { } @@ -221,7 +222,7 @@ public interface ActiveMQServerPlugin { * @param autoDeleteAddress */ default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, - boolean removeConsumers, boolean autoDeleteAddress) throws Exception { + boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException { } @@ -234,7 +235,7 @@ public interface ActiveMQServerPlugin { * @param direct * @param noAutoCreateQueue */ - default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws Exception { + default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { //by default call the old method for backwards compatibility this.beforeSend(tx, message, direct, noAutoCreateQueue); } @@ -250,7 +251,7 @@ public interface ActiveMQServerPlugin { * @param result */ default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, - RoutingStatus result) throws Exception { + RoutingStatus result) throws ActiveMQException { //by default call the old method for backwards compatibility this.afterSend(tx, message, direct, noAutoCreateQueue, result); } @@ -264,10 +265,10 @@ public interface ActiveMQServerPlugin { * @param direct * @param noAutoCreateQueue * - * @deprecated use throws Exception {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} + * @deprecated use throws ActiveMQException {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} */ @Deprecated - default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws Exception { + default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { } @@ -280,11 +281,11 @@ public interface ActiveMQServerPlugin { * @param noAutoCreateQueue * @param result * - * @deprecated use throws Exception {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} + * @deprecated use throws ActiveMQException {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} */ @Deprecated default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, - RoutingStatus result) throws Exception { + RoutingStatus result) throws ActiveMQException { } @@ -296,7 +297,7 @@ public interface ActiveMQServerPlugin { * @param direct * @param rejectDuplicates */ - default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception { + default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException { } @@ -310,7 +311,7 @@ public interface ActiveMQServerPlugin { * @param result */ default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, - RoutingStatus result) throws Exception { + RoutingStatus result) throws ActiveMQException { } @@ -320,7 +321,7 @@ public interface ActiveMQServerPlugin { * @param consumer the consumer the message will be delivered to * @param reference message reference */ - default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws Exception { + default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { //by default call the old method for backwards compatibility this.beforeDeliver(reference); } @@ -331,7 +332,7 @@ public interface ActiveMQServerPlugin { * @param consumer the consumer the message was delivered to * @param reference message reference */ - default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws Exception { + default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { //by default call the old method for backwards compatibility this.afterDeliver(reference); } @@ -341,10 +342,10 @@ public interface ActiveMQServerPlugin { * * @param reference * - * @deprecated use throws Exception {@link #beforeDeliver(ServerConsumer, MessageReference)} + * @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)} */ @Deprecated - default void beforeDeliver(MessageReference reference) throws Exception { + default void beforeDeliver(MessageReference reference) throws ActiveMQException { } @@ -353,10 +354,10 @@ public interface ActiveMQServerPlugin { * * @param reference * - * @deprecated use throws Exception {@link #afterDeliver(ServerConsumer, MessageReference)} + * @deprecated use throws ActiveMQException {@link #afterDeliver(ServerConsumer, MessageReference)} */ @Deprecated - default void afterDeliver(MessageReference reference) throws Exception { + default void afterDeliver(MessageReference reference) throws ActiveMQException { } @@ -366,7 +367,7 @@ public interface ActiveMQServerPlugin { * @param message The expired message * @param messageExpiryAddress The message expiry address if exists */ - default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws Exception { + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { } @@ -376,7 +377,7 @@ public interface ActiveMQServerPlugin { * @param ref The acked message * @param reason The ack reason */ - default void messageAcknowledged(MessageReference ref, AckReason reason) throws Exception { + default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { } @@ -385,7 +386,7 @@ public interface ActiveMQServerPlugin { * * @param config The bridge configuration */ - default void beforeDeployBridge(BridgeConfiguration config) throws Exception { + default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException { } @@ -394,7 +395,7 @@ public interface ActiveMQServerPlugin { * * @param bridge The newly deployed bridge */ - default void afterDeployBridge(Bridge bridge) throws Exception { + default void afterDeployBridge(Bridge bridge) throws ActiveMQException { }