ARTEMIS-1327 - Support checked exceptions in ActiveMQServerPlugin

This will allow plugin writers to use checked exceptions when writing
plugins
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-08-07 10:49:46 -04:00 committed by Clebert Suconic
parent f12116d5a5
commit a3a614ee8b
7 changed files with 60 additions and 49 deletions

View File

@ -514,8 +514,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
} catch (Throwable t) {
logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t);
}
if (logger.isTraceEnabled()) {
logger.trace("Connection created " + connection);
}
@ -537,7 +540,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
if (conn != null && !conn.connection.isSupportReconnect()) {
RemotingConnection removedConnection = removeConnection(connectionID);
if (removedConnection != null) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
} catch (Throwable t) {
logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);
}
}
conn.connection.fail(new ActiveMQRemoteDisconnectException());
}

View File

@ -198,7 +198,7 @@ public interface ActiveMQServer extends ServiceComponent {
List<ActiveMQServerPlugin> getBrokerPlugins();
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun);
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws Exception;
boolean hasBrokerPlugins();

View File

@ -16,11 +16,12 @@
*/
package org.apache.activemq.artemis.core.server;
import javax.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Set;
import javax.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -208,9 +209,9 @@ public interface ServerSession extends SecurityAuth {
Set<ServerConsumer> getServerConsumers();
void addMetaData(String key, String data);
void addMetaData(String key, String data) throws Exception;
boolean addUniqueMetaData(String key, String data);
boolean addUniqueMetaData(String key, String data) throws Exception;
String getMetaData(String key);

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import javax.management.MBeanServer;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@ -48,6 +47,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair;
@ -164,11 +165,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SecurityFormatter;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
@ -1843,9 +1844,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) {
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws Exception {
if (pluginRun != null) {
getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin));
for (ActiveMQServerPlugin plugin : getBrokerPlugins()) {
pluginRun.run(plugin);
}
}
}

View File

@ -1383,7 +1383,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public void addMetaData(String key, String data) {
public void addMetaData(String key, String data) throws Exception {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
if (metaData == null) {
metaData = new HashMap<>();
@ -1393,7 +1393,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public boolean addUniqueMetaData(String key, String data) {
public boolean addUniqueMetaData(String key, String data) throws Exception {
ServerSession sessionWithMetaData = server.lookupSession(key, data);
if (sessionWithMetaData != null && sessionWithMetaData != this) {
// There is a duplication of this property

View File

@ -19,6 +19,6 @@ package org.apache.activemq.artemis.core.server.plugin;
public interface ActiveMQPluginRunnable {
void run(ActiveMQServerPlugin plugin);
void run(ActiveMQServerPlugin plugin) throws Exception;
}

View File

@ -47,7 +47,7 @@ public interface ActiveMQServerPlugin {
*
* @param connection The newly created connection
*/
default void afterCreateConnection(RemotingConnection connection) {
default void afterCreateConnection(RemotingConnection connection) throws Exception {
}
@ -56,7 +56,7 @@ public interface ActiveMQServerPlugin {
*
* @param connection
*/
default void afterDestroyConnection(RemotingConnection connection) {
default void afterDestroyConnection(RemotingConnection connection) throws Exception {
}
@ -80,7 +80,7 @@ public interface ActiveMQServerPlugin {
default void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) {
Map<SimpleString, RoutingType> prefixes) throws Exception {
}
@ -89,7 +89,7 @@ public interface ActiveMQServerPlugin {
*
* @param session The newly created session
*/
default void afterCreateSession(ServerSession session) {
default void afterCreateSession(ServerSession session) throws Exception {
}
@ -99,7 +99,7 @@ public interface ActiveMQServerPlugin {
* @param session
* @param failed
*/
default void beforeCloseSession(ServerSession session, boolean failed) {
default void beforeCloseSession(ServerSession session, boolean failed) throws Exception {
}
@ -109,7 +109,7 @@ public interface ActiveMQServerPlugin {
* @param session
* @param failed
*/
default void afterCloseSession(ServerSession session, boolean failed) {
default void afterCloseSession(ServerSession session, boolean failed) throws Exception {
}
@ -120,7 +120,7 @@ public interface ActiveMQServerPlugin {
* @param key
* @param data
*/
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws Exception {
}
@ -131,7 +131,7 @@ public interface ActiveMQServerPlugin {
* @param key
* @param data
*/
default void afterSessionMetadataAdded(ServerSession session, String key, String data) {
default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws Exception {
}
@ -145,7 +145,7 @@ public interface ActiveMQServerPlugin {
* @param supportLargeMessage
*/
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) {
boolean browseOnly, boolean supportLargeMessage) throws Exception {
}
@ -154,7 +154,7 @@ public interface ActiveMQServerPlugin {
*
* @param consumer the created consumer
*/
default void afterCreateConsumer(ServerConsumer consumer) {
default void afterCreateConsumer(ServerConsumer consumer) throws Exception {
}
@ -164,7 +164,7 @@ public interface ActiveMQServerPlugin {
* @param consumer
* @param failed
*/
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws Exception {
}
@ -174,7 +174,7 @@ public interface ActiveMQServerPlugin {
* @param consumer
* @param failed
*/
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws Exception {
}
@ -183,7 +183,7 @@ public interface ActiveMQServerPlugin {
*
* @param queueConfig
*/
default void beforeCreateQueue(QueueConfig queueConfig) {
default void beforeCreateQueue(QueueConfig queueConfig) throws Exception {
}
@ -192,7 +192,7 @@ public interface ActiveMQServerPlugin {
*
* @param queue The newly created queue
*/
default void afterCreateQueue(Queue queue) {
default void afterCreateQueue(Queue queue) throws Exception {
}
@ -206,7 +206,7 @@ public interface ActiveMQServerPlugin {
* @param autoDeleteAddress
*/
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
boolean removeConsumers, boolean autoDeleteAddress) throws Exception {
}
@ -221,7 +221,7 @@ public interface ActiveMQServerPlugin {
* @param autoDeleteAddress
*/
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
boolean removeConsumers, boolean autoDeleteAddress) throws Exception {
}
@ -234,7 +234,7 @@ public interface ActiveMQServerPlugin {
* @param direct
* @param noAutoCreateQueue
*/
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws Exception {
//by default call the old method for backwards compatibility
this.beforeSend(tx, message, direct, noAutoCreateQueue);
}
@ -250,7 +250,7 @@ public interface ActiveMQServerPlugin {
* @param result
*/
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) {
RoutingStatus result) throws Exception {
//by default call the old method for backwards compatibility
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
}
@ -264,10 +264,10 @@ public interface ActiveMQServerPlugin {
* @param direct
* @param noAutoCreateQueue
*
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
* @deprecated use throws Exception {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
*/
@Deprecated
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws Exception {
}
@ -280,11 +280,11 @@ public interface ActiveMQServerPlugin {
* @param noAutoCreateQueue
* @param result
*
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
* @deprecated use throws Exception {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
*/
@Deprecated
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) {
RoutingStatus result) throws Exception {
}
@ -296,7 +296,7 @@ public interface ActiveMQServerPlugin {
* @param direct
* @param rejectDuplicates
*/
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception {
}
@ -310,7 +310,7 @@ public interface ActiveMQServerPlugin {
* @param result
*/
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) {
RoutingStatus result) throws Exception {
}
@ -320,7 +320,7 @@ public interface ActiveMQServerPlugin {
* @param consumer the consumer the message will be delivered to
* @param reference message reference
*/
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) {
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws Exception {
//by default call the old method for backwards compatibility
this.beforeDeliver(reference);
}
@ -331,7 +331,7 @@ public interface ActiveMQServerPlugin {
* @param consumer the consumer the message was delivered to
* @param reference message reference
*/
default void afterDeliver(ServerConsumer consumer, MessageReference reference) {
default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws Exception {
//by default call the old method for backwards compatibility
this.afterDeliver(reference);
}
@ -341,10 +341,10 @@ public interface ActiveMQServerPlugin {
*
* @param reference
*
* @deprecated use {@link #beforeDeliver(ServerConsumer, MessageReference)}
* @deprecated use throws Exception {@link #beforeDeliver(ServerConsumer, MessageReference)}
*/
@Deprecated
default void beforeDeliver(MessageReference reference) {
default void beforeDeliver(MessageReference reference) throws Exception {
}
@ -353,10 +353,10 @@ public interface ActiveMQServerPlugin {
*
* @param reference
*
* @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)}
* @deprecated use throws Exception {@link #afterDeliver(ServerConsumer, MessageReference)}
*/
@Deprecated
default void afterDeliver(MessageReference reference) {
default void afterDeliver(MessageReference reference) throws Exception {
}
@ -366,7 +366,7 @@ public interface ActiveMQServerPlugin {
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
*/
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws Exception {
}
@ -376,7 +376,7 @@ public interface ActiveMQServerPlugin {
* @param ref The acked message
* @param reason The ack reason
*/
default void messageAcknowledged(MessageReference ref, AckReason reason) {
default void messageAcknowledged(MessageReference ref, AckReason reason) throws Exception {
}
@ -385,7 +385,7 @@ public interface ActiveMQServerPlugin {
*
* @param config The bridge configuration
*/
default void beforeDeployBridge(BridgeConfiguration config) {
default void beforeDeployBridge(BridgeConfiguration config) throws Exception {
}
@ -394,7 +394,7 @@ public interface ActiveMQServerPlugin {
*
* @param bridge The newly deployed bridge
*/
default void afterDeployBridge(Bridge bridge) {
default void afterDeployBridge(Bridge bridge) throws Exception {
}