diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 7dfb1a5cef..7da5b029b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; @@ -1081,4 +1082,24 @@ public interface Configuration { Configuration setNetworkCheckPing6Command(String command); String getInternalNamingPrefix(); + + /** + * @param plugins + */ + void registerBrokerPlugins(List plugins); + + /** + * @param plugin + */ + void registerBrokerPlugin(ActiveMQServerPlugin plugin); + + /** + * @param plugin + */ + void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin); + + /** + * @return + */ + List getBrokerPlugins(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 2a538cac53..8edeb5b1a3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; @@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.utils.Env; @@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private List securitySettingPlugins = new ArrayList<>(); + private final List brokerPlugins = new CopyOnWriteArrayList<>(); + private Map> securityRoleNameMappings = new HashMap<>(); protected List connectorServiceConfigurations = new ArrayList<>(); @@ -1320,6 +1324,26 @@ public class ConfigurationImpl implements Configuration, Serializable { return this.securitySettingPlugins; } + @Override + public void registerBrokerPlugins(final List plugins) { + brokerPlugins.addAll(plugins); + } + + @Override + public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { + brokerPlugins.add(plugin); + } + + @Override + public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { + brokerPlugins.remove(plugin); + } + + @Override + public List getBrokerPlugins() { + return brokerPlugins; + } + @Override public File getBrokerInstance() { if (artemisInstance != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 2ef76571dc..a927768479 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } else { try { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null); processRoute(message, context, direct); + final RoutingStatus finalResult = result; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, + rejectDuplicates, finalResult) : null); } catch (ActiveMQAddressFullException e) { if (startedTX.get()) { context.getTransaction().rollback(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index e0e5b529dc..7c9c675721 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif } ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null); if (logger.isTraceEnabled()) { logger.trace("Connection created " + connection); @@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif ConnectionEntry conn = connections.get(connectionID); if (conn != null && !conn.connection.isSupportReconnect()) { - removeConnection(connectionID); - + RemotingConnection removedConnection = removeConnection(connectionID); + if (removedConnection != null) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null); + } conn.connection.fail(new ActiveMQRemoteDisconnectException()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index bfd9aec351..e16557f5aa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server; -import javax.management.MBeanServer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -24,6 +23,8 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.management.MBeanServer; + import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ConnectorsService; import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent { */ void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception; + void registerBrokerPlugin(ActiveMQServerPlugin plugin); + + void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin); + + void registerBrokerPlugins(List plugins); + + List getBrokerPlugins(); + + void callBrokerPlugins(ActiveMQPluginRunnable pluginRun); + + boolean hasBrokerPlugins(); + void checkQueueCreationLimit(String username) throws Exception; ServerSession createSession(String name, @@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent { void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception; String getInternalNamingPrefix(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index d2219c29bc..70edb68aa4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent { return; } + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null); + Queue queue = (Queue) binding.getBindable(); ServerLocatorInternal serverLocator; @@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent { bridge.start(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null); } public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 8482cb3705..06964ee9a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import javax.management.MBeanServer; -import javax.security.cert.X509Certificate; import java.io.File; import java.io.IOException; import java.io.PrintWriter; @@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.management.MBeanServer; +import javax.security.cert.X509Certificate; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.Pair; @@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; @@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection, + autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null); + final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes); sessions.put(name, session); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null); + return session; } @@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { return; } + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount, + removeConsumers, autoDeleteAddress) : null); + addressSettingsRepository.clearCache(); Binding binding = postOffice.getBinding(queueName); @@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } callPostQueueDeletionCallbacks(address, queueName); + + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, + removeConsumers, autoDeleteAddress) : null); } @Override @@ -1807,6 +1821,38 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + @Override + public void registerBrokerPlugins(final List plugins) { + configuration.registerBrokerPlugins(plugins); + } + + @Override + public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { + configuration.registerBrokerPlugin(plugin); + } + + @Override + public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { + configuration.unRegisterBrokerPlugin(plugin); + } + + @Override + public List getBrokerPlugins() { + return configuration.getBrokerPlugins(); + } + + @Override + public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) { + if (pluginRun != null) { + getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin)); + } + } + + @Override + public boolean hasBrokerPlugins() { + return !getBrokerPlugins().isEmpty(); + } + @Override public ExecutorFactory getExecutorFactory() { return executorFactory; @@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService); - queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager); + queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this); pagingManager = createPagingManager(); @@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build(); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null); + final Queue queue = queueFactory.createQueueWith(queueConfig); if (transientQueue) { @@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { callPostQueueCreationCallbacks(queue.getName()); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null); + return queue; } @@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer { deployAddressesFromConfiguration(config); } } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index ceec92c0ce..8370839bda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; @@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, + final ActiveMQServer server) { + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 9258a0742c..3d8ceb12c2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; @@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory { protected final ExecutorFactory executorFactory; + protected final ActiveMQServer server; + public QueueFactoryImpl(final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutor, final HierarchicalRepository addressSettingsRepository, - final StorageManager storageManager) { + final StorageManager storageManager, + final ActiveMQServer server) { + this.addressSettingsRepository = addressSettingsRepository; - this.scheduledExecutor = scheduledExecutor; - this.storageManager = storageManager; - this.executorFactory = executorFactory; + this.server = server; } @Override @@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory { final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } return queue; } @@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory { Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } else { - queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } return queue; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a62ae79498..c2cfdef03d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; @@ -198,6 +198,8 @@ public class QueueImpl implements Queue { private final HierarchicalRepository addressSettingsRepository; + private final ActiveMQServer server; + private final ScheduledExecutorService scheduledExecutor; private final SimpleString address; @@ -330,8 +332,9 @@ public class QueueImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor) { - this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, + final ActiveMQServer server) { + this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server); } public QueueImpl(final long id, @@ -347,8 +350,9 @@ public class QueueImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor) { - this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, + final ActiveMQServer server) { + this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server); } public QueueImpl(final long id, @@ -367,7 +371,8 @@ public class QueueImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor) { + final Executor executor, + final ActiveMQServer server) { this.id = id; @@ -401,6 +406,8 @@ public class QueueImpl implements Queue { this.scheduledExecutor = scheduledExecutor; + this.server = server; + scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor); if (addressSettingsRepository != null) { @@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue { messagesAcknowledged++; } + if (server != null) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null); + } } @Override @@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue { } else { messagesAcknowledged++; } + + if (server != null) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null); + } } @Override @@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue { } acknowledge(ref, AckReason.EXPIRED); } + + if (server != null) { + final SimpleString expiryAddress = messageExpiryAddress; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null); + } } private SimpleString expiryAddressFromMessageAddress(MessageReference ref) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 9e33602078..af8524d198 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { try { Message message = reference.getMessage(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null); + if (message.isLargeMessage() && supportLargeMessage) { if (largeMessageDeliverer == null) { // This can't really happen as handle had already crated the deliverer @@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } finally { lockDelivery.readLock().unlock(); callback.afterDelivery(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null); } } @@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); } + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null); + setStarted(false); LargeMessageDeliverer del = largeMessageDeliverer; @@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { managementService.sendNotification(notification); } + + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index edd7afc875..724584313a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -16,10 +16,8 @@ */ package org.apache.activemq.artemis.core.server.impl; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObjectBuilder; -import javax.transaction.xa.XAException; -import javax.transaction.xa.Xid; +import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,6 +29,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import javax.json.JsonArrayBuilder; +import javax.json.JsonObjectBuilder; +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; @@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.TypedProperties; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; - /** * Server side Session implementation */ @@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected void doClose(final boolean failed) throws Exception { synchronized (this) { + if (!closed) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null); + } this.setStarted(false); if (closed) return; @@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } closed = true; + + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null); } } @@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { Filter filter = FilterImpl.createFilter(filterString); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName, + filterString, browseOnly, supportLargeMessage) : null); + ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); consumers.put(consumer.getID(), consumer); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null); + if (!browseOnly) { TypedProperties props = new TypedProperties(); @@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean direct, boolean noAutoCreateQueue) throws Exception { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null); + // If the protocol doesn't support flow control, we have no choice other than fail the communication if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); @@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (message.getAddressSimpleString().equals(managementAddress)) { // It's a management message - handleManagementMessage(tx, message, direct); + result = handleManagementMessage(tx, message, direct); } else { result = doSend(tx, message, address, direct, noAutoCreateQueue); } + + final RoutingStatus finalResult = result; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null); + return result; } @@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void addMetaData(String key, String data) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null); if (metaData == null) { metaData = new HashMap<>(); } metaData.put(key, data); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java new file mode 100644 index 0000000000..bc85475cd7 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +public interface ActiveMQPluginRunnable { + + void run(ActiveMQServerPlugin plugin); + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java new file mode 100644 index 0000000000..95296f0909 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; + + +public interface ActiveMQServerPlugin { + + + /** + * A connection has been created. + * + * @param connection The newly created connection + */ + default void afterCreateConnection(RemotingConnection connection) { + + } + + /** + * A connection has been destroyed. + * + * @param connection + */ + default void afterDestroyConnection(RemotingConnection connection) { + + } + + /** + * Before a session is created. + * + * @param name + * @param username + * @param minLargeMessageSize + * @param connection + * @param autoCommitSends + * @param autoCommitAcks + * @param preAcknowledge + * @param xa + * @param defaultAddress + * @param callback + * @param autoCreateQueues + * @param context + * @param prefixes + */ + default void beforeCreateSession(String name, String username, int minLargeMessageSize, + RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, + boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, + Map prefixes) { + + } + + /** + * After a session has been created. + * + * @param session The newly created session + */ + default void afterCreateSession(ServerSession session) { + + } + + /** + * Before a session is closed + * + * @param session + * @param failed + */ + default void beforeCloseSession(ServerSession session, boolean failed) { + + } + + /** + * After a session is closed + * + * @param session + * @param failed + */ + default void afterCloseSession(ServerSession session, boolean failed) { + + } + + /** + * Before session metadata is added to the session + * + * @param session + * @param key + * @param data + */ + default void beforeSessionMetadataAdded(ServerSession session, String key, String data) { + + } + + /** + * After session metadata is added to the session + * + * @param session + * @param key + * @param data + */ + default void afterSessionMetadataAdded(ServerSession session, String key, String data) { + + } + + /** + * Before a consumer is created + * + * @param consumerID + * @param queueName + * @param filterString + * @param browseOnly + * @param supportLargeMessage + */ + default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) { + + } + + /** + * After a consumer has been created + * + * @param consumer the created consumer + */ + default void afterCreateConsumer(ServerConsumer consumer) { + + } + + /** + * Before a consumer is closed + * + * @param consumer + * @param failed + */ + default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { + + } + + /** + * After a consumer is closed + * + * @param consumer + * @param failed + */ + default void afterCloseConsumer(ServerConsumer consumer, boolean failed) { + + } + + /** + * Before a queue is created + * + * @param queueConfig + */ + default void beforeCreateQueue(QueueConfig queueConfig) { + + } + + /** + * After a queue has been created + * + * @param queue The newly created queue + */ + default void afterCreateQueue(Queue queue) { + + } + + /** + * Before a queue is destroyed + * + * @param queueName + * @param session + * @param checkConsumerCount + * @param removeConsumers + * @param autoDeleteAddress + */ + default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + + } + + /** + * After a queue has been destroyed + * + * @param queue + * @param address + * @param session + * @param checkConsumerCount + * @param removeConsumers + * @param autoDeleteAddress + */ + default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + + } + + /** + * Before a message is sent + * + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + */ + default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { + + } + + /** + * After a message is sent + * + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @param result + */ + default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + RoutingStatus result) { + + } + + /** + * Before a message is routed + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + */ + default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { + + } + + /** + * After a message is routed + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + * @param result + */ + default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + RoutingStatus result) { + + } + + /** + * Before a message is delivered to a client consumer + * + * @param reference + */ + default void beforeDeliver(MessageReference reference) { + + } + + /** + * After a message is delivered to a client consumer + * + * @param reference + */ + default void afterDeliver(MessageReference reference) { + + } + + /** + * A message has been expired + * + * @param message The expired message + * @param messageExpiryAddress The message expiry address if exists + */ + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { + + } + + /** + * A message has been acknowledged + * + * @param ref The acked message + * @param reason The ack reason + */ + default void messageAcknowledged(MessageReference ref, AckReason reason) { + + } + + /** + * Before a bridge is deployed + * + * @param config The bridge configuration + */ + default void beforeDeployBridge(BridgeConfiguration config) { + + } + + /** + * After a bridge has been deployed + * + * @param bridge The newly deployed bridge + */ + default void afterDeployBridge(Bridge bridge) { + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 8d278954b8..60b9b74b21 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport { // Add optional security for tests that need it configureBrokerSecurity(server); + // Add extra configuration + addConfiguration(server); + server.start(); // Prepare all addresses and queues for client tests. @@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport { return server; } + protected void addConfiguration(ActiveMQServer server) { + + } + protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) { HashMap params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 201a96b2c9..da695ca0a3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.client; -import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.util.LinkedList; import java.util.Map; @@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import javax.management.MBeanServer; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; - import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; @@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor) { - super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, final ActiveMQServer server) { + super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, + maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, + addressSettingsRepository, executor, server); } @Override @@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase { LocalFactory(final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutor, final HierarchicalRepository addressSettingsRepository, - final StorageManager storageManager) { - super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager); + final StorageManager storageManager, final ActiveMQServer server) { + super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server); } @Override public Queue createQueueWith(final QueueConfig config) { - queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), + config.user(), config.pageSubscription(), config.isDurable(), + config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), + config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, + postOffice, storageManager, addressSettingsRepository, + executorFactory.getExecutor(), server); return queue; } @@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase { final boolean durable, final boolean temporary, final boolean autoCreated) { - queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, + temporary, autoCreated, RoutingType.MULTICAST, null, null, + scheduledExecutor, postOffice, storageManager, addressSettingsRepository, + executorFactory.getExecutor(), server); return queue; } } - LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager()); + LocalFactory queueFactory = + new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), + server.getAddressSettingsRepository(), server.getStorageManager(), server); queueFactory.setPostOffice(server.getPostOffice()); @@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase { long txID = server.getStorageManager().generateID(); // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally - LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); + LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, + new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, + false, null, null, null, null, null, null), + server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 540baf64f2..44015e1992 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.client; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; @@ -29,9 +23,17 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { StorageManager storageManager, HierarchicalRepository addressSettingsRepository, Executor executor) { - super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, + postOffice, storageManager, addressSettingsRepository, executor, null); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java index f8094a1ea1..63743edf39 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java @@ -16,14 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.jms.client; +import java.util.List; +import java.util.Map; + import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import java.util.List; -import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; @@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase { for (int i = 0; i < 100; i++) { long txid = storage.generateID(); - final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); + final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), + SimpleString.toSimpleString("topic"), + FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, + true, false, false, server.getScheduledPool(), server.getPostOffice(), + storage, server.getAddressSettingsRepository(), + server.getExecutorFactory().getExecutor(), server); LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java index 056891aed9..bd8cfd86b5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java @@ -45,7 +45,7 @@ public class JmsResourceProvider { /** * Creates a connection. * - * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory) + * @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory) */ public Connection createConnection(ConnectionFactory cf) throws JMSException { Connection connection = cf.createConnection(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java new file mode 100644 index 0000000000..d918b2766e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic send and receive scenarios using only AMQP sender and receiver links. + */ +public class AmqpPluginTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class); + + private final Map methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + protected void addConfiguration(ActiveMQServer server) { + super.addConfiguration(server); + server.registerBrokerPlugin(verifier); + } + + @Test(timeout = 60000) + public void testQueueReceiverReadAndAckMessage() throws Exception { + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + Queue queueView = getProxyToQueue(getQueueName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + receiver.close(); + connection.close(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, + AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER); + } + + @Override + public void sendMessages(String destinationName, int count) throws Exception { + sendMessages(destinationName, count, null); + } + + @Override + public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + if (routingType != null) { + message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType()); + } + sender.send(message); + } + } finally { + connection.close(); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java new file mode 100644 index 0000000000..9f6b4ea931 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; +import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +public class CorePluginTest extends JMSTestBase { + + private Queue queue; + + private final Map methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName(); + + @Override + protected Configuration createDefaultConfig(boolean netty) throws Exception { + Configuration config = super.createDefaultConfig(netty); + config.registerBrokerPlugin(verifier); + return config; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + queue = createQueue("queue1"); + } + + + @Test + public void testSendReceive() throws Exception { + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(queue); + MessageConsumer cons = sess.createConsumer(queue); + + TextMessage msg1 = sess.createTextMessage("test"); + prod.send(msg1); + TextMessage received1 = (TextMessage)cons.receive(1000); + assertNotNull(received1); + + conn.close(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, + BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE); + verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER); + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, + AFTER_CLOSE_SESSION); + } + + @Test + public void testDestroyQueue() throws Exception { + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + sess.createProducer(queue); + conn.close(); + + server.destroyQueue(new SimpleString(queue.getQueueName())); + + verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE, + AFTER_DESTROY_QUEUE); + } + + @Test + public void testMessageExpireServer() throws Exception { + server.registerBrokerPlugin(new ExpiredPluginVerifier()); + + conn = cf.createConnection(); + conn.setClientID("test"); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(queue); + prod.setTimeToLive(1); + MessageConsumer cons = sess.createConsumer(queue); + Thread.sleep(100); + TextMessage msg1 = sess.createTextMessage("test"); + prod.send(msg1); + Thread.sleep(100); + assertNull(cons.receive(100)); + + conn.close(); + + verifier.validatePluginMethodsEquals(0, BEFORE_DELIVER, AFTER_DELIVER, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED); + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, + AFTER_CLOSE_SESSION); + + } + + @Test + public void testMessageExpireClient() throws Exception { + server.registerBrokerPlugin(new ExpiredPluginVerifier()); + + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(queue); + prod.setTimeToLive(500); + MessageConsumer cons = sess.createConsumer(queue); + + TextMessage msg1 = sess.createTextMessage("test"); + prod.send(msg1); + Thread.sleep(500); + assertNull(cons.receive(500)); + + conn.close(); + + verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED); + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, + AFTER_CLOSE_SESSION); + + } + + @Test + public void testSimpleBridge() throws Exception { + server.stop(); + ActiveMQServer server0; + ActiveMQServer server1; + + Map server0Params = new HashMap<>(); + server0 = createClusteredServerWithParams(false, 0, false, server0Params); + + Map server1Params = new HashMap<>(); + server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1); + server1 = createClusteredServerWithParams(false, 1, false, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + + TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params); + + HashMap connectors = new HashMap<>(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + server0.registerBrokerPlugin(verifier); + + ArrayList connectorConfig = new ArrayList<>(); + connectorConfig.add(server1tc.getName()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1") + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setRetryInterval(1000) + .setReconnectAttemptsOnSameNode(-1) + .setUseDuplicateDetection(false) + .setStaticConnectors(connectorConfig); + + List bridgeConfigs = new ArrayList<>(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); + List queueConfigs0 = new ArrayList<>(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + server1.start(); + server0.start(); + + verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + + server0.stop(); + server1.stop(); + + } + + private class ExpiredPluginVerifier implements ActiveMQServerPlugin { + + @Override + public void messageAcknowledged(MessageReference ref, AckReason reason) { + assertEquals(AckReason.EXPIRED, reason); + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java new file mode 100644 index 0000000000..14aa4a19c8 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -0,0 +1,276 @@ +/** + * + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; + + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +public class MethodCalledVerifier implements ActiveMQServerPlugin { + + private final Map methodCalls; + + public static final String AFTER_CREATE_CONNECTION = "afterCreateConnection"; + public static final String AFTER_DESTROY_CONNECTION = "afterDestroyConnection"; + public static final String BEFORE_CREATE_SESSION = "beforeCreateSession"; + public static final String AFTER_CREATE_SESSION = "afterCreateSession"; + public static final String BEFORE_CLOSE_SESSION = "beforeCloseSession"; + public static final String AFTER_CLOSE_SESSION = "afterCloseSession"; + public static final String BEFORE_SESSION_METADATA_ADDED = "beforeSessionMetadataAdded"; + public static final String AFTER_SESSION_METADATA_ADDED = "afterSessionMetadataAdded"; + public static final String BEFORE_CREATE_CONSUMER = "beforeCreateConsumer"; + public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer"; + public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer"; + public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer"; + public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue"; + public static final String AFTER_CREATE_QUEUE = "afterCreateQueue"; + public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue"; + public static final String AFTER_DESTROY_QUEUE = "afterDestroyQueue"; + public static final String MESSAGE_EXPIRED = "messageExpired"; + public static final String MESSAGE_ACKED = "messageAcknowledged"; + public static final String BEFORE_SEND = "beforeSend"; + public static final String AFTER_SEND = "afterSend"; + public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute"; + public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute"; + public static final String BEFORE_DELIVER = "beforeDeliver"; + public static final String AFTER_DELIVER = "afterDeliver"; + public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge"; + public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge"; + + /** + * @param methods + */ + public MethodCalledVerifier(Map methodCalls) { + super(); + this.methodCalls = methodCalls; + } + + @Override + public void afterCreateConnection(RemotingConnection connection) { + Preconditions.checkNotNull(connection); + methodCalled(AFTER_CREATE_CONNECTION); + } + + @Override + public void afterDestroyConnection(RemotingConnection connection) { + Preconditions.checkNotNull(connection); + methodCalled(AFTER_DESTROY_CONNECTION); + } + + @Override + public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, + boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, + String defaultAddress, SessionCallback callback, boolean autoCreateQueues, + OperationContext context, Map prefixes) { + Preconditions.checkNotNull(connection); + methodCalled(BEFORE_CREATE_SESSION); + } + + @Override + public void afterCreateSession(ServerSession session) { + Preconditions.checkNotNull(session); + methodCalled(AFTER_CREATE_SESSION); + } + + @Override + public void beforeCloseSession(ServerSession session, boolean failed) { + Preconditions.checkNotNull(session); + methodCalled(BEFORE_CLOSE_SESSION); + } + + @Override + public void afterCloseSession(ServerSession session, boolean failed) { + Preconditions.checkNotNull(session); + methodCalled(AFTER_CLOSE_SESSION); + } + + @Override + public void beforeSessionMetadataAdded(ServerSession session, String key, String data) { + Preconditions.checkNotNull(key); + methodCalled(BEFORE_SESSION_METADATA_ADDED); + } + + @Override + public void afterSessionMetadataAdded(ServerSession session, String key, String data) { + Preconditions.checkNotNull(key); + methodCalled(AFTER_SESSION_METADATA_ADDED); + } + + @Override + public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) { + Preconditions.checkNotNull(queueName); + methodCalled(BEFORE_CREATE_CONSUMER); + } + + @Override + public void afterCreateConsumer(ServerConsumer consumer) { + Preconditions.checkNotNull(consumer); + methodCalled(AFTER_CREATE_CONSUMER); + } + + @Override + public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { + Preconditions.checkNotNull(consumer); + methodCalled(BEFORE_CLOSE_CONSUMER); + } + + @Override + public void afterCloseConsumer(ServerConsumer consumer, boolean failed) { + Preconditions.checkNotNull(consumer); + methodCalled(AFTER_CLOSE_CONSUMER); + } + + @Override + public void beforeCreateQueue(QueueConfig queueConfig) { + Preconditions.checkNotNull(queueConfig); + methodCalled(BEFORE_CREATE_QUEUE); + } + + @Override + public void afterCreateQueue(org.apache.activemq.artemis.core.server.Queue queue) { + Preconditions.checkNotNull(queue); + methodCalled(AFTER_CREATE_QUEUE); + } + + @Override + public void beforeDestroyQueue(SimpleString queueName, SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + Preconditions.checkNotNull(queueName); + methodCalled(BEFORE_DESTROY_QUEUE); + } + + @Override + public void afterDestroyQueue(Queue queue, SimpleString address, SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + Preconditions.checkNotNull(queue); + methodCalled(AFTER_DESTROY_QUEUE); + } + + @Override + public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { + Preconditions.checkNotNull(message); + methodCalled(MESSAGE_EXPIRED); + } + + @Override + public void messageAcknowledged(MessageReference ref, AckReason reason) { + Preconditions.checkNotNull(ref); + Preconditions.checkNotNull(reason); + methodCalled(MESSAGE_ACKED); + } + + @Override + public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { + Preconditions.checkNotNull(message); + methodCalled(BEFORE_SEND); + } + + @Override + public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + RoutingStatus result) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(result); + methodCalled(AFTER_SEND); + } + + @Override + public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(context); + methodCalled(BEFORE_MESSAGE_ROUTE); + } + + @Override + public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + RoutingStatus result) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(result); + methodCalled(AFTER_MESSAGE_ROUTE); + } + + @Override + public void beforeDeliver(MessageReference reference) { + Preconditions.checkNotNull(reference); + methodCalled(BEFORE_DELIVER); + } + + @Override + public void afterDeliver(MessageReference reference) { + Preconditions.checkNotNull(reference); + methodCalled(AFTER_DELIVER); + } + + @Override + public void beforeDeployBridge(BridgeConfiguration config) { + Preconditions.checkNotNull(config); + methodCalled(BEFORE_DEPLOY_BRIDGE); + } + + @Override + public void afterDeployBridge(Bridge bridge) { + Preconditions.checkNotNull(bridge); + methodCalled(AFTER_DEPLOY_BRIDGE); + } + + public void validatePluginMethodsEquals(int count, String... names) { + Arrays.asList(names).forEach(name -> { + assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get()); + }); + } + + public void validatePluginMethodsAtLeast(int count, String... names) { + Arrays.asList(names).forEach(name -> { + assertTrue("validating method " + name, count <= methodCalls.getOrDefault(name, new AtomicInteger()).get()); + }); + } + + private void methodCalled(String name) { + methodCalls.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java new file mode 100644 index 0000000000..5e7f12706f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.junit.Before; +import org.junit.Test; + +public class MqttPluginTest extends MQTTTestSupport { + + + private final Map methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + @Before + public void setUp() throws Exception { + Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); + sessions.setAccessible(true); + sessions.set(null, new ConcurrentHashMap<>()); + + Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); + connectedClients.setAccessible(true); + connectedClients.set(null, new ConcurrentHashSet<>()); + super.setUp(); + + } + + @Override + public void configureBroker() throws Exception { + super.configureBroker(); + server.registerBrokerPlugin(verifier); + } + + @Test(timeout = 60 * 1000) + public void testSendAndReceiveMQTT() throws Exception { + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); + + final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < NUM_MESSAGES; i++) { + try { + byte[] payload = subscriptionProvider.receive(10000); + assertNotNull("Should get a message", payload); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + + } + } + }); + thread.start(); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + initializeConnection(publishProvider); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String payload = "Message " + i; + publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); + } + + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + subscriptionProvider.disconnect(); + publishProvider.disconnect(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java new file mode 100644 index 0000000000..afb6841d72 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +public class OpenwirePluginTest extends BasicOpenWireTest { + + private final Map methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + protected ActiveMQServer createServer(boolean realFiles, Configuration configuration, long pageSize, + long maxAddressSize, Map settings) { + ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings); + server.registerBrokerPlugin(verifier); + return server; + } + + @Test + public void testAckedMessageAreConsumed() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + connection.close(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, + BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java new file mode 100644 index 0000000000..c771272bef --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StompPluginTest extends StompTestBase { + + private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + public static final String CLIENT_ID = "myclientid"; + + private StompClientConnectionV12 conn; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conn = (StompClientConnectionV12)StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + } + + @Override + @After + public void tearDown() throws Exception { + try { + boolean connected = conn != null && conn.isConnected(); + log.debug("Connection 1.2 : " + connected); + if (connected) { + conn.disconnect(); + } + } finally { + super.tearDown(); + } + } + + private final Map methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + protected JMSServerManager createServer() throws Exception { + JMSServerManager server = super.createServer(); + server.getActiveMQServer().registerBrokerPlugin(verifier); + return server; + } + + @Test + public void testSendAndReceive() throws Exception { + + // subscribehoward county escaped + StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + newConn.connect(defUser, defPass); + subscribe(newConn, "a-sub"); + + send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!"); + ClientStompFrame frame = newConn.receiveFrame(); + + System.out.println("received " + frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + + // unsub + unsubscribe(newConn, "a-sub"); + + newConn.disconnect(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); + + } + +} diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java index c0a41128f4..121a4b01a5 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java @@ -67,7 +67,10 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduledNoConsumer() throws Exception { - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())); + QueueImpl queue = + new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, + false, scheduledExecutor, null, null, null, + Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); // Send one scheduled @@ -132,7 +135,10 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduled() throws Exception { - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())); + QueueImpl queue = + new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, + false, scheduledExecutor, null, null, null, + Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); FakeConsumer consumer = null; @@ -230,7 +236,10 @@ public class QueueImplTest extends ActiveMQTestBase { public void disconnect() { } }; - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())); + QueueImpl queue = + new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, + scheduledExecutor, null, null, null, + Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); MessageReference messageReference = generateReference(queue, 1); queue.addConsumer(consumer); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 78179a8ca4..0a08eb6078 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1309,6 +1309,7 @@ public class QueueImplTest extends ActiveMQTestBase { } private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) { - return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, new FakePostOffice(), null, null, executor); + return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, + new FakePostOffice(), null, null, executor, null); } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index 06c7e1e6b9..40c117aa20 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -40,7 +40,9 @@ public final class FakeQueueFactory implements QueueFactory { @Override public Queue createQueueWith(final QueueConfig config) { - return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor); + return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), + config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), + scheduledExecutor, postOffice, null, null, executor, null); } @Deprecated @@ -54,7 +56,8 @@ public final class FakeQueueFactory implements QueueFactory { final boolean durable, final boolean temporary, final boolean autoCreated) { - return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, null, null, executor); + return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, + scheduledExecutor, postOffice, null, null, executor, null); } @Override