This closes #1446
This commit is contained in:
Clebert Suconic 2017-08-07 16:31:19 -04:00
commit 8f33d276d7
7 changed files with 77 additions and 48 deletions

View File

@ -514,8 +514,13 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
} }
ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
} catch (ActiveMQException t) {
logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t);
throw new IllegalStateException(t.getMessage(), t.getCause());
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Connection created " + connection); logger.trace("Connection created " + connection);
} }
@ -537,7 +542,13 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
if (conn != null && !conn.connection.isSupportReconnect()) { if (conn != null && !conn.connection.isSupportReconnect()) {
RemotingConnection removedConnection = removeConnection(connectionID); RemotingConnection removedConnection = removeConnection(connectionID);
if (removedConnection != null) { if (removedConnection != null) {
try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
} catch (ActiveMQException t) {
logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);
conn.connection.fail(t);
return;
}
} }
conn.connection.fail(new ActiveMQRemoteDisconnectException()); conn.connection.fail(new ActiveMQRemoteDisconnectException());
} }

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -198,7 +199,7 @@ public interface ActiveMQServer extends ServiceComponent {
List<ActiveMQServerPlugin> getBrokerPlugins(); List<ActiveMQServerPlugin> getBrokerPlugins();
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun); void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException;
boolean hasBrokerPlugins(); boolean hasBrokerPlugins();

View File

@ -16,11 +16,12 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import javax.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import javax.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
@ -208,9 +209,9 @@ public interface ServerSession extends SecurityAuth {
Set<ServerConsumer> getServerConsumers(); Set<ServerConsumer> getServerConsumers();
void addMetaData(String key, String data); void addMetaData(String key, String data) throws Exception;
boolean addUniqueMetaData(String key, String data); boolean addUniqueMetaData(String key, String data) throws Exception;
String getMetaData(String key); String getMetaData(String key);

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import javax.management.MBeanServer;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -48,8 +47,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -164,11 +166,11 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SecurityFormatter; import org.apache.activemq.artemis.utils.SecurityFormatter;
import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -1843,9 +1845,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
@Override @Override
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) { public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException {
if (pluginRun != null) { if (pluginRun != null) {
getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin)); for (ActiveMQServerPlugin plugin : getBrokerPlugins()) {
try {
pluginRun.run(plugin);
} catch (Throwable e) {
if (e instanceof ActiveMQException) {
logger.debug("plugin " + plugin + " is throwing ActiveMQException");
throw (ActiveMQException) e;
} else {
logger.warn("Internal error on plugin " + pluginRun, e.getMessage(), e);
}
}
}
} }
} }

View File

@ -1383,7 +1383,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
@Override @Override
public void addMetaData(String key, String data) { public void addMetaData(String key, String data) throws Exception {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
if (metaData == null) { if (metaData == null) {
metaData = new HashMap<>(); metaData = new HashMap<>();
@ -1393,7 +1393,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
@Override @Override
public boolean addUniqueMetaData(String key, String data) { public boolean addUniqueMetaData(String key, String data) throws Exception {
ServerSession sessionWithMetaData = server.lookupSession(key, data); ServerSession sessionWithMetaData = server.lookupSession(key, data);
if (sessionWithMetaData != null && sessionWithMetaData != this) { if (sessionWithMetaData != null && sessionWithMetaData != this) {
// There is a duplication of this property // There is a duplication of this property

View File

@ -17,8 +17,10 @@
package org.apache.activemq.artemis.core.server.plugin; package org.apache.activemq.artemis.core.server.plugin;
import org.apache.activemq.artemis.api.core.ActiveMQException;
public interface ActiveMQPluginRunnable { public interface ActiveMQPluginRunnable {
void run(ActiveMQServerPlugin plugin); void run(ActiveMQServerPlugin plugin) throws ActiveMQException;
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.plugin;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -47,7 +48,7 @@ public interface ActiveMQServerPlugin {
* *
* @param connection The newly created connection * @param connection The newly created connection
*/ */
default void afterCreateConnection(RemotingConnection connection) { default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
} }
@ -56,7 +57,7 @@ public interface ActiveMQServerPlugin {
* *
* @param connection * @param connection
*/ */
default void afterDestroyConnection(RemotingConnection connection) { default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
} }
@ -80,7 +81,7 @@ public interface ActiveMQServerPlugin {
default void beforeCreateSession(String name, String username, int minLargeMessageSize, default void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) { Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
} }
@ -89,7 +90,7 @@ public interface ActiveMQServerPlugin {
* *
* @param session The newly created session * @param session The newly created session
*/ */
default void afterCreateSession(ServerSession session) { default void afterCreateSession(ServerSession session) throws ActiveMQException {
} }
@ -99,7 +100,7 @@ public interface ActiveMQServerPlugin {
* @param session * @param session
* @param failed * @param failed
*/ */
default void beforeCloseSession(ServerSession session, boolean failed) { default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
} }
@ -109,7 +110,7 @@ public interface ActiveMQServerPlugin {
* @param session * @param session
* @param failed * @param failed
*/ */
default void afterCloseSession(ServerSession session, boolean failed) { default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
} }
@ -120,7 +121,7 @@ public interface ActiveMQServerPlugin {
* @param key * @param key
* @param data * @param data
*/ */
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) { default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
} }
@ -131,7 +132,7 @@ public interface ActiveMQServerPlugin {
* @param key * @param key
* @param data * @param data
*/ */
default void afterSessionMetadataAdded(ServerSession session, String key, String data) { default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
} }
@ -145,7 +146,7 @@ public interface ActiveMQServerPlugin {
* @param supportLargeMessage * @param supportLargeMessage
*/ */
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) { boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
} }
@ -154,7 +155,7 @@ public interface ActiveMQServerPlugin {
* *
* @param consumer the created consumer * @param consumer the created consumer
*/ */
default void afterCreateConsumer(ServerConsumer consumer) { default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
} }
@ -164,7 +165,7 @@ public interface ActiveMQServerPlugin {
* @param consumer * @param consumer
* @param failed * @param failed
*/ */
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
} }
@ -174,7 +175,7 @@ public interface ActiveMQServerPlugin {
* @param consumer * @param consumer
* @param failed * @param failed
*/ */
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) { default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
} }
@ -183,7 +184,7 @@ public interface ActiveMQServerPlugin {
* *
* @param queueConfig * @param queueConfig
*/ */
default void beforeCreateQueue(QueueConfig queueConfig) { default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException {
} }
@ -192,7 +193,7 @@ public interface ActiveMQServerPlugin {
* *
* @param queue The newly created queue * @param queue The newly created queue
*/ */
default void afterCreateQueue(Queue queue) { default void afterCreateQueue(Queue queue) throws ActiveMQException {
} }
@ -206,7 +207,7 @@ public interface ActiveMQServerPlugin {
* @param autoDeleteAddress * @param autoDeleteAddress
*/ */
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) { boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
} }
@ -221,7 +222,7 @@ public interface ActiveMQServerPlugin {
* @param autoDeleteAddress * @param autoDeleteAddress
*/ */
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) { boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
} }
@ -234,7 +235,7 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
*/ */
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
this.beforeSend(tx, message, direct, noAutoCreateQueue); this.beforeSend(tx, message, direct, noAutoCreateQueue);
} }
@ -250,7 +251,7 @@ public interface ActiveMQServerPlugin {
* @param result * @param result
*/ */
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) { RoutingStatus result) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
this.afterSend(tx, message, direct, noAutoCreateQueue, result); this.afterSend(tx, message, direct, noAutoCreateQueue, result);
} }
@ -264,10 +265,10 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
* *
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} * @deprecated use throws ActiveMQException {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
*/ */
@Deprecated @Deprecated
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
} }
@ -280,11 +281,11 @@ public interface ActiveMQServerPlugin {
* @param noAutoCreateQueue * @param noAutoCreateQueue
* @param result * @param result
* *
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} * @deprecated use throws ActiveMQException {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
*/ */
@Deprecated @Deprecated
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) { RoutingStatus result) throws ActiveMQException {
} }
@ -296,7 +297,7 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param rejectDuplicates * @param rejectDuplicates
*/ */
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
} }
@ -310,7 +311,7 @@ public interface ActiveMQServerPlugin {
* @param result * @param result
*/ */
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) { RoutingStatus result) throws ActiveMQException {
} }
@ -320,7 +321,7 @@ public interface ActiveMQServerPlugin {
* @param consumer the consumer the message will be delivered to * @param consumer the consumer the message will be delivered to
* @param reference message reference * @param reference message reference
*/ */
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) { default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
this.beforeDeliver(reference); this.beforeDeliver(reference);
} }
@ -331,7 +332,7 @@ public interface ActiveMQServerPlugin {
* @param consumer the consumer the message was delivered to * @param consumer the consumer the message was delivered to
* @param reference message reference * @param reference message reference
*/ */
default void afterDeliver(ServerConsumer consumer, MessageReference reference) { default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
//by default call the old method for backwards compatibility //by default call the old method for backwards compatibility
this.afterDeliver(reference); this.afterDeliver(reference);
} }
@ -341,10 +342,10 @@ public interface ActiveMQServerPlugin {
* *
* @param reference * @param reference
* *
* @deprecated use {@link #beforeDeliver(ServerConsumer, MessageReference)} * @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)}
*/ */
@Deprecated @Deprecated
default void beforeDeliver(MessageReference reference) { default void beforeDeliver(MessageReference reference) throws ActiveMQException {
} }
@ -353,10 +354,10 @@ public interface ActiveMQServerPlugin {
* *
* @param reference * @param reference
* *
* @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)} * @deprecated use throws ActiveMQException {@link #afterDeliver(ServerConsumer, MessageReference)}
*/ */
@Deprecated @Deprecated
default void afterDeliver(MessageReference reference) { default void afterDeliver(MessageReference reference) throws ActiveMQException {
} }
@ -366,7 +367,7 @@ public interface ActiveMQServerPlugin {
* @param message The expired message * @param message The expired message
* @param messageExpiryAddress The message expiry address if exists * @param messageExpiryAddress The message expiry address if exists
*/ */
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
} }
@ -376,7 +377,7 @@ public interface ActiveMQServerPlugin {
* @param ref The acked message * @param ref The acked message
* @param reason The ack reason * @param reason The ack reason
*/ */
default void messageAcknowledged(MessageReference ref, AckReason reason) { default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
} }
@ -385,7 +386,7 @@ public interface ActiveMQServerPlugin {
* *
* @param config The bridge configuration * @param config The bridge configuration
*/ */
default void beforeDeployBridge(BridgeConfiguration config) { default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException {
} }
@ -394,7 +395,7 @@ public interface ActiveMQServerPlugin {
* *
* @param bridge The newly deployed bridge * @param bridge The newly deployed bridge
*/ */
default void afterDeployBridge(Bridge bridge) { default void afterDeployBridge(Bridge bridge) throws ActiveMQException {
} }