ARTEMIS-898 - Adding Plugin Support

Adding a new ActievMQServerPlugin interface to support adding custom
behavior to the broker at certain events such as connection or session
creation.

https://issues.apache.org/jira/browse/ARTEMIS-898
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-05-02 09:46:17 -04:00
parent 303d97c76d
commit 1e1ede84c0
28 changed files with 1664 additions and 58 deletions

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
@ -1081,4 +1082,24 @@ public interface Configuration {
Configuration setNetworkCheckPing6Command(String command);
String getInternalNamingPrefix();
/**
* @param plugins
*/
void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
/**
* @param plugin
*/
void registerBrokerPlugin(ActiveMQServerPlugin plugin);
/**
* @param plugin
*/
void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
/**
* @return
*/
List<ActiveMQServerPlugin> getBrokerPlugins();
}

View File

@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.utils.Env;
@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>();
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<>();
@ -1320,6 +1324,26 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this.securitySettingPlugins;
}
@Override
public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
brokerPlugins.addAll(plugins);
}
@Override
public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
brokerPlugins.add(plugin);
}
@Override
public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
brokerPlugins.remove(plugin);
}
@Override
public List<ActiveMQServerPlugin> getBrokerPlugins() {
return brokerPlugins;
}
@Override
public File getBrokerInstance() {
if (artemisInstance != null) {

View File

@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
} else {
try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
processRoute(message, context, direct);
final RoutingStatus finalResult = result;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
rejectDuplicates, finalResult) : null);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
context.getTransaction().rollback();

View File

@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
if (logger.isTraceEnabled()) {
logger.trace("Connection created " + connection);
@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
ConnectionEntry conn = connections.get(connectionID);
if (conn != null && !conn.connection.isSupportReconnect()) {
removeConnection(connectionID);
RemotingConnection removedConnection = removeConnection(connectionID);
if (removedConnection != null) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
}
conn.connection.fail(new ActiveMQRemoteDisconnectException());
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server;
import javax.management.MBeanServer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -24,6 +23,8 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
void registerBrokerPlugin(ActiveMQServerPlugin plugin);
void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
List<ActiveMQServerPlugin> getBrokerPlugins();
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun);
boolean hasBrokerPlugins();
void checkQueueCreationLimit(String username) throws Exception;
ServerSession createSession(String name,
@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent {
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
String getInternalNamingPrefix();
}

View File

@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent {
return;
}
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null);
Queue queue = (Queue) binding.getBindable();
ServerLocatorInternal serverLocator;
@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent {
bridge.start();
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null);
}
public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import javax.management.MBeanServer;
import javax.security.cert.X509Certificate;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.security.cert.X509Certificate;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair;
@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
sessions.put(name, session);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null);
return session;
}
@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return;
}
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
addressSettingsRepository.clearCache();
Binding binding = postOffice.getBinding(queueName);
@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
callPostQueueDeletionCallbacks(address, queueName);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
}
@Override
@ -1807,6 +1821,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
configuration.registerBrokerPlugins(plugins);
}
@Override
public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
configuration.registerBrokerPlugin(plugin);
}
@Override
public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
configuration.unRegisterBrokerPlugin(plugin);
}
@Override
public List<ActiveMQServerPlugin> getBrokerPlugins() {
return configuration.getBrokerPlugins();
}
@Override
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) {
if (pluginRun != null) {
getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin));
}
}
@Override
public boolean hasBrokerPlugins() {
return !getBrokerPlugins().isEmpty();
}
@Override
public ExecutorFactory getExecutorFactory() {
return executorFactory;
@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService);
queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager);
queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this);
pagingManager = createPagingManager();
@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callPostQueueCreationCallbacks(queue.getName());
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
return queue;
}
@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer {
deployAddressesFromConfiguration(config);
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
final Executor executor,
final ActiveMQServer server) {
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
@Override

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory {
protected final ExecutorFactory executorFactory;
protected final ActiveMQServer server;
public QueueFactoryImpl(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final StorageManager storageManager) {
final StorageManager storageManager,
final ActiveMQServer server) {
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.server = server;
}
@Override
@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
} else {
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
}
return queue;
}
@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isLastValueQueue()) {
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
}
return queue;

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
@ -198,6 +198,8 @@ public class QueueImpl implements Queue {
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private final ActiveMQServer server;
private final ScheduledExecutorService scheduledExecutor;
private final SimpleString address;
@ -330,8 +332,9 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
final Executor executor,
final ActiveMQServer server) {
this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
public QueueImpl(final long id,
@ -347,8 +350,9 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
final Executor executor,
final ActiveMQServer server) {
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
public QueueImpl(final long id,
@ -367,7 +371,8 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
final Executor executor,
final ActiveMQServer server) {
this.id = id;
@ -401,6 +406,8 @@ public class QueueImpl implements Queue {
this.scheduledExecutor = scheduledExecutor;
this.server = server;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
if (addressSettingsRepository != null) {
@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue {
messagesAcknowledged++;
}
if (server != null) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
}
}
@Override
@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue {
} else {
messagesAcknowledged++;
}
if (server != null) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
}
}
@Override
@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue {
}
acknowledge(ref, AckReason.EXPIRED);
}
if (server != null) {
final SimpleString expiryAddress = messageExpiryAddress;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null);
}
}
private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {

View File

@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
try {
Message message = reference.getMessage();
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null);
if (message.isLargeMessage() && supportLargeMessage) {
if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer
@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} finally {
lockDelivery.readLock().unlock();
callback.afterDelivery();
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null);
}
}
@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null);
setStarted(false);
LargeMessageDeliverer del = largeMessageDeliverer;
@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
managementService.sendNotification(notification);
}
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null);
}
@Override

View File

@ -16,10 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObjectBuilder;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -31,6 +29,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObjectBuilder;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
/**
* Server side Session implementation
*/
@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected void doClose(final boolean failed) throws Exception {
synchronized (this) {
if (!closed) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
}
this.setStarted(false);
if (closed)
return;
@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
closed = true;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null);
}
}
@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Filter filter = FilterImpl.createFilter(filterString);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName,
filterString, browseOnly, supportLargeMessage) : null);
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
consumers.put(consumer.getID(), consumer);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null);
if (!browseOnly) {
TypedProperties props = new TypedProperties();
@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null);
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
final RoutingStatus finalResult = result;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null);
return result;
}
@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void addMetaData(String key, String data) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
if (metaData == null) {
metaData = new HashMap<>();
}
metaData.put(key, data);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null);
}
@Override

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin;
public interface ActiveMQPluginRunnable {
void run(ActiveMQServerPlugin plugin);
}

View File

@ -0,0 +1,336 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public interface ActiveMQServerPlugin {
/**
* A connection has been created.
*
* @param connection The newly created connection
*/
default void afterCreateConnection(RemotingConnection connection) {
}
/**
* A connection has been destroyed.
*
* @param connection
*/
default void afterDestroyConnection(RemotingConnection connection) {
}
/**
* Before a session is created.
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
*/
default void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) {
}
/**
* After a session has been created.
*
* @param session The newly created session
*/
default void afterCreateSession(ServerSession session) {
}
/**
* Before a session is closed
*
* @param session
* @param failed
*/
default void beforeCloseSession(ServerSession session, boolean failed) {
}
/**
* After a session is closed
*
* @param session
* @param failed
*/
default void afterCloseSession(ServerSession session, boolean failed) {
}
/**
* Before session metadata is added to the session
*
* @param session
* @param key
* @param data
*/
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
}
/**
* After session metadata is added to the session
*
* @param session
* @param key
* @param data
*/
default void afterSessionMetadataAdded(ServerSession session, String key, String data) {
}
/**
* Before a consumer is created
*
* @param consumerID
* @param queueName
* @param filterString
* @param browseOnly
* @param supportLargeMessage
*/
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) {
}
/**
* After a consumer has been created
*
* @param consumer the created consumer
*/
default void afterCreateConsumer(ServerConsumer consumer) {
}
/**
* Before a consumer is closed
*
* @param consumer
* @param failed
*/
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
}
/**
* After a consumer is closed
*
* @param consumer
* @param failed
*/
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
}
/**
* Before a queue is created
*
* @param queueConfig
*/
default void beforeCreateQueue(QueueConfig queueConfig) {
}
/**
* After a queue has been created
*
* @param queue The newly created queue
*/
default void afterCreateQueue(Queue queue) {
}
/**
* Before a queue is destroyed
*
* @param queueName
* @param session
* @param checkConsumerCount
* @param removeConsumers
* @param autoDeleteAddress
*/
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
}
/**
* After a queue has been destroyed
*
* @param queue
* @param address
* @param session
* @param checkConsumerCount
* @param removeConsumers
* @param autoDeleteAddress
*/
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
}
/**
* Before a message is sent
*
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
*/
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
}
/**
* After a message is sent
*
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
*/
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) {
}
/**
* Before a message is routed
*
* @param message
* @param context
* @param direct
* @param rejectDuplicates
*/
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
}
/**
* After a message is routed
*
* @param message
* @param context
* @param direct
* @param rejectDuplicates
* @param result
*/
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) {
}
/**
* Before a message is delivered to a client consumer
*
* @param reference
*/
default void beforeDeliver(MessageReference reference) {
}
/**
* After a message is delivered to a client consumer
*
* @param reference
*/
default void afterDeliver(MessageReference reference) {
}
/**
* A message has been expired
*
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
*/
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
}
/**
* A message has been acknowledged
*
* @param ref The acked message
* @param reason The ack reason
*/
default void messageAcknowledged(MessageReference ref, AckReason reason) {
}
/**
* Before a bridge is deployed
*
* @param config The bridge configuration
*/
default void beforeDeployBridge(BridgeConfiguration config) {
}
/**
* After a bridge has been deployed
*
* @param bridge The newly deployed bridge
*/
default void afterDeployBridge(Bridge bridge) {
}
}

View File

@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
// Add optional security for tests that need it
configureBrokerSecurity(server);
// Add extra configuration
addConfiguration(server);
server.start();
// Prepare all addresses and queues for client tests.
@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return server;
}
protected void addConfiguration(ActiveMQServer server) {
}
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.util.LinkedList;
import java.util.Map;
@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
final Executor executor, final ActiveMQServer server) {
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executor, server);
}
@Override
@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
LocalFactory(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final StorageManager storageManager) {
super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager);
final StorageManager storageManager, final ActiveMQServer server) {
super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server);
}
@Override
public Queue createQueueWith(final QueueConfig config) {
queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(),
config.user(), config.pageSubscription(), config.isDurable(),
config.isTemporary(), config.isAutoCreated(), config.deliveryMode(),
config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor,
postOffice, storageManager, addressSettingsRepository,
executorFactory.getExecutor(), server);
return queue;
}
@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable,
temporary, autoCreated, RoutingType.MULTICAST, null, null,
scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
executorFactory.getExecutor(), server);
return queue;
}
}
LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager());
LocalFactory queueFactory =
new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(),
server.getAddressSettingsRepository(), server.getStorageManager(), server);
queueFactory.setPostOffice(server.getPostOffice());
@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
long txID = server.getStorageManager().generateID();
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
false, null, null, null, null, null, null),
server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);

View File

@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
@ -29,9 +23,17 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
StorageManager storageManager,
HierarchicalRepository<AddressSettings> addressSettingsRepository,
Executor executor) {
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
postOffice, storageManager, addressSettingsRepository, executor, null);
}
@Override

View File

@ -16,14 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase {
for (int i = 0; i < 100; i++) {
long txid = storage.generateID();
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"),
SimpleString.toSimpleString("topic"),
FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null,
true, false, false, server.getScheduledPool(), server.getPostOffice(),
storage, server.getAddressSettingsRepository(),
server.getExecutorFactory().getExecutor(), server);
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());

View File

@ -45,7 +45,7 @@ public class JmsResourceProvider {
/**
* Creates a connection.
*
* @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
* @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory)
*/
public Connection createConnection(ConnectionFactory cf) throws JMSException {
Connection connection = cf.createConnection();

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
*/
public class AmqpPluginTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class);
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
protected void addConfiguration(ActiveMQServer server) {
super.addConfiguration(server);
server.registerBrokerPlugin(verifier);
}
@Test(timeout = 60000)
public void testQueueReceiverReadAndAckMessage() throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
receiver.close();
connection.close();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
}
@Override
public void sendMessages(String destinationName, int count) throws Exception {
sendMessages(destinationName, count, null);
}
@Override
public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(destinationName);
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
if (routingType != null) {
message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
}
sender.send(message);
}
} finally {
connection.close();
}
}
}

View File

@ -0,0 +1,257 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
public class CorePluginTest extends JMSTestBase {
private Queue queue;
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
@Override
protected Configuration createDefaultConfig(boolean netty) throws Exception {
Configuration config = super.createDefaultConfig(netty);
config.registerBrokerPlugin(verifier);
return config;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
queue = createQueue("queue1");
}
@Test
public void testSendReceive() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
MessageConsumer cons = sess.createConsumer(queue);
TextMessage msg1 = sess.createTextMessage("test");
prod.send(msg1);
TextMessage received1 = (TextMessage)cons.receive(1000);
assertNotNull(received1);
conn.close();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE);
verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
}
@Test
public void testDestroyQueue() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
sess.createProducer(queue);
conn.close();
server.destroyQueue(new SimpleString(queue.getQueueName()));
verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE);
}
@Test
public void testMessageExpireServer() throws Exception {
server.registerBrokerPlugin(new ExpiredPluginVerifier());
conn = cf.createConnection();
conn.setClientID("test");
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
prod.setTimeToLive(1);
MessageConsumer cons = sess.createConsumer(queue);
Thread.sleep(100);
TextMessage msg1 = sess.createTextMessage("test");
prod.send(msg1);
Thread.sleep(100);
assertNull(cons.receive(100));
conn.close();
verifier.validatePluginMethodsEquals(0, BEFORE_DELIVER, AFTER_DELIVER, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
}
@Test
public void testMessageExpireClient() throws Exception {
server.registerBrokerPlugin(new ExpiredPluginVerifier());
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
prod.setTimeToLive(500);
MessageConsumer cons = sess.createConsumer(queue);
TextMessage msg1 = sess.createTextMessage("test");
prod.send(msg1);
Thread.sleep(500);
assertNull(cons.receive(500));
conn.close();
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
}
@Test
public void testSimpleBridge() throws Exception {
server.stop();
ActiveMQServer server0;
ActiveMQServer server1;
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(false, 0, false, server0Params);
Map<String, Object> server1Params = new HashMap<>();
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
server1 = createClusteredServerWithParams(false, 1, false, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
server0.registerBrokerPlugin(verifier);
ArrayList<String> connectorConfig = new ArrayList<>();
connectorConfig.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
.setQueueName(queueName0)
.setForwardingAddress(forwardAddress)
.setRetryInterval(1000)
.setReconnectAttemptsOnSameNode(-1)
.setUseDuplicateDetection(false)
.setStaticConnectors(connectorConfig);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
server1.start();
server0.start();
verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
server0.stop();
server1.stop();
}
private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
@Override
public void messageAcknowledged(MessageReference ref, AckReason reason) {
assertEquals(AckReason.EXPIRED, reason);
}
}
}

View File

@ -0,0 +1,276 @@
/**
*
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class MethodCalledVerifier implements ActiveMQServerPlugin {
private final Map<String, AtomicInteger> methodCalls;
public static final String AFTER_CREATE_CONNECTION = "afterCreateConnection";
public static final String AFTER_DESTROY_CONNECTION = "afterDestroyConnection";
public static final String BEFORE_CREATE_SESSION = "beforeCreateSession";
public static final String AFTER_CREATE_SESSION = "afterCreateSession";
public static final String BEFORE_CLOSE_SESSION = "beforeCloseSession";
public static final String AFTER_CLOSE_SESSION = "afterCloseSession";
public static final String BEFORE_SESSION_METADATA_ADDED = "beforeSessionMetadataAdded";
public static final String AFTER_SESSION_METADATA_ADDED = "afterSessionMetadataAdded";
public static final String BEFORE_CREATE_CONSUMER = "beforeCreateConsumer";
public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer";
public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer";
public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer";
public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue";
public static final String AFTER_CREATE_QUEUE = "afterCreateQueue";
public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue";
public static final String AFTER_DESTROY_QUEUE = "afterDestroyQueue";
public static final String MESSAGE_EXPIRED = "messageExpired";
public static final String MESSAGE_ACKED = "messageAcknowledged";
public static final String BEFORE_SEND = "beforeSend";
public static final String AFTER_SEND = "afterSend";
public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
public static final String BEFORE_DELIVER = "beforeDeliver";
public static final String AFTER_DELIVER = "afterDeliver";
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
/**
* @param methods
*/
public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
super();
this.methodCalls = methodCalls;
}
@Override
public void afterCreateConnection(RemotingConnection connection) {
Preconditions.checkNotNull(connection);
methodCalled(AFTER_CREATE_CONNECTION);
}
@Override
public void afterDestroyConnection(RemotingConnection connection) {
Preconditions.checkNotNull(connection);
methodCalled(AFTER_DESTROY_CONNECTION);
}
@Override
public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection,
boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa,
String defaultAddress, SessionCallback callback, boolean autoCreateQueues,
OperationContext context, Map<SimpleString, RoutingType> prefixes) {
Preconditions.checkNotNull(connection);
methodCalled(BEFORE_CREATE_SESSION);
}
@Override
public void afterCreateSession(ServerSession session) {
Preconditions.checkNotNull(session);
methodCalled(AFTER_CREATE_SESSION);
}
@Override
public void beforeCloseSession(ServerSession session, boolean failed) {
Preconditions.checkNotNull(session);
methodCalled(BEFORE_CLOSE_SESSION);
}
@Override
public void afterCloseSession(ServerSession session, boolean failed) {
Preconditions.checkNotNull(session);
methodCalled(AFTER_CLOSE_SESSION);
}
@Override
public void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
Preconditions.checkNotNull(key);
methodCalled(BEFORE_SESSION_METADATA_ADDED);
}
@Override
public void afterSessionMetadataAdded(ServerSession session, String key, String data) {
Preconditions.checkNotNull(key);
methodCalled(AFTER_SESSION_METADATA_ADDED);
}
@Override
public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
boolean browseOnly, boolean supportLargeMessage) {
Preconditions.checkNotNull(queueName);
methodCalled(BEFORE_CREATE_CONSUMER);
}
@Override
public void afterCreateConsumer(ServerConsumer consumer) {
Preconditions.checkNotNull(consumer);
methodCalled(AFTER_CREATE_CONSUMER);
}
@Override
public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
Preconditions.checkNotNull(consumer);
methodCalled(BEFORE_CLOSE_CONSUMER);
}
@Override
public void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
Preconditions.checkNotNull(consumer);
methodCalled(AFTER_CLOSE_CONSUMER);
}
@Override
public void beforeCreateQueue(QueueConfig queueConfig) {
Preconditions.checkNotNull(queueConfig);
methodCalled(BEFORE_CREATE_QUEUE);
}
@Override
public void afterCreateQueue(org.apache.activemq.artemis.core.server.Queue queue) {
Preconditions.checkNotNull(queue);
methodCalled(AFTER_CREATE_QUEUE);
}
@Override
public void beforeDestroyQueue(SimpleString queueName, SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
Preconditions.checkNotNull(queueName);
methodCalled(BEFORE_DESTROY_QUEUE);
}
@Override
public void afterDestroyQueue(Queue queue, SimpleString address, SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
Preconditions.checkNotNull(queue);
methodCalled(AFTER_DESTROY_QUEUE);
}
@Override
public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
Preconditions.checkNotNull(message);
methodCalled(MESSAGE_EXPIRED);
}
@Override
public void messageAcknowledged(MessageReference ref, AckReason reason) {
Preconditions.checkNotNull(ref);
Preconditions.checkNotNull(reason);
methodCalled(MESSAGE_ACKED);
}
@Override
public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
Preconditions.checkNotNull(message);
methodCalled(BEFORE_SEND);
}
@Override
public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(result);
methodCalled(AFTER_SEND);
}
@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(context);
methodCalled(BEFORE_MESSAGE_ROUTE);
}
@Override
public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(result);
methodCalled(AFTER_MESSAGE_ROUTE);
}
@Override
public void beforeDeliver(MessageReference reference) {
Preconditions.checkNotNull(reference);
methodCalled(BEFORE_DELIVER);
}
@Override
public void afterDeliver(MessageReference reference) {
Preconditions.checkNotNull(reference);
methodCalled(AFTER_DELIVER);
}
@Override
public void beforeDeployBridge(BridgeConfiguration config) {
Preconditions.checkNotNull(config);
methodCalled(BEFORE_DEPLOY_BRIDGE);
}
@Override
public void afterDeployBridge(Bridge bridge) {
Preconditions.checkNotNull(bridge);
methodCalled(AFTER_DEPLOY_BRIDGE);
}
public void validatePluginMethodsEquals(int count, String... names) {
Arrays.asList(names).forEach(name -> {
assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get());
});
}
public void validatePluginMethodsAtLeast(int count, String... names) {
Arrays.asList(names).forEach(name -> {
assertTrue("validating method " + name, count <= methodCalls.getOrDefault(name, new AtomicInteger()).get());
});
}
private void methodCalled(String name) {
methodCalls.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet();
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.junit.Before;
import org.junit.Test;
public class MqttPluginTest extends MQTTTestSupport {
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
@Before
public void setUp() throws Exception {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
@Override
public void configureBroker() throws Exception {
super.configureBroker();
server.registerBrokerPlugin(verifier);
}
@Test(timeout = 60 * 1000)
public void testSendAndReceiveMQTT() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < NUM_MESSAGES; i++) {
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Message " + i;
publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
}
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscriptionProvider.disconnect();
publishProvider.disconnect();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.junit.Test;
public class OpenwirePluginTest extends BasicOpenWireTest {
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
protected ActiveMQServer createServer(boolean realFiles, Configuration configuration, long pageSize,
long maxAddressSize, Map<String, AddressSettings> settings) {
ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
server.registerBrokerPlugin(verifier);
return server;
}
@Test
public void testAckedMessageAreConsumed() throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
// Reset the session.
session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(1000);
assertNull(msg);
session.close();
connection.close();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class StompPluginTest extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
public static final String CLIENT_ID = "myclientid";
private StompClientConnectionV12 conn;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
conn = (StompClientConnectionV12)StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
}
@Override
@After
public void tearDown() throws Exception {
try {
boolean connected = conn != null && conn.isConnected();
log.debug("Connection 1.2 : " + connected);
if (connected) {
conn.disconnect();
}
} finally {
super.tearDown();
}
}
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
protected JMSServerManager createServer() throws Exception {
JMSServerManager server = super.createServer();
server.getActiveMQServer().registerBrokerPlugin(verifier);
return server;
}
@Test
public void testSendAndReceive() throws Exception {
// subscribehoward county escaped
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub");
send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
ClientStompFrame frame = newConn.receiveFrame();
System.out.println("received " + frame);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
// unsub
unsubscribe(newConn, "a-sub");
newConn.disconnect();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
}
}

View File

@ -67,7 +67,10 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test
public void testScheduledNoConsumer() throws Exception {
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
QueueImpl queue =
new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
false, scheduledExecutor, null, null, null,
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
// Send one scheduled
@ -132,7 +135,10 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test
public void testScheduled() throws Exception {
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
QueueImpl queue =
new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
false, scheduledExecutor, null, null, null,
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
FakeConsumer consumer = null;
@ -230,7 +236,10 @@ public class QueueImplTest extends ActiveMQTestBase {
public void disconnect() {
}
};
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
QueueImpl queue =
new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false,
scheduledExecutor, null, null, null,
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

View File

@ -1309,6 +1309,7 @@ public class QueueImplTest extends ActiveMQTestBase {
}
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, new FakePostOffice(), null, null, executor);
return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
new FakePostOffice(), null, null, executor, null);
}
}

View File

@ -40,7 +40,9 @@ public final class FakeQueueFactory implements QueueFactory {
@Override
public Queue createQueueWith(final QueueConfig config) {
return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor);
return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(),
config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
scheduledExecutor, postOffice, null, null, executor, null);
}
@Deprecated
@ -54,7 +56,8 @@ public final class FakeQueueFactory implements QueueFactory {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, null, null, executor);
return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated,
scheduledExecutor, postOffice, null, null, executor, null);
}
@Override