From fe665061395769e38cbce21b2dafcce87a4b0c29 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Mon, 21 Oct 2019 15:00:55 -0400 Subject: [PATCH] ARTEMIS-2565 - Add plugin support for Federated Queues/Addresses Add a new interface called ActiveMQServerFederationPlugin to allow customization of the federated queue/address feature of the broker --- .../artemis/core/config/Configuration.java | 6 + .../core/config/impl/ConfigurationImpl.java | 41 ++- .../artemis/core/server/ActiveMQServer.java | 7 + .../core/server/ActiveMQServerLogger.java | 5 + .../federation/AbstractFederationStream.java | 141 +++++++++ .../server/federation/FederatedAbstract.java | 44 ++- .../federation/FederatedQueueConsumer.java | 174 +---------- .../FederatedQueueConsumerImpl.java | 233 ++++++++++++++ .../federation/FederationDownstream.java | 4 +- .../server/federation/FederationStream.java | 87 +----- .../server/federation/FederationUpstream.java | 6 +- .../federation/address/FederatedAddress.java | 25 +- .../federation/queue/FederatedQueue.java | 31 +- .../core/server/impl/ActiveMQServerImpl.java | 16 + .../ActiveMQServerFederationPlugin.java | 136 ++++++++ .../server/plugin/ActiveMQServerPlugin.java | 3 +- .../federation/FederatedAddressTest.java | 116 +------ .../federation/FederatedQueueTest.java | 127 ++------ .../federation/FederatedTestUtil.java | 181 +++++++++++ .../plugin/FederationBrokerPluginTest.java | 291 ++++++++++++++++++ .../plugin/MethodCalledVerifier.java | 93 +++++- 21 files changed, 1299 insertions(+), 468 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/AbstractFederationStream.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestUtil.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/FederationBrokerPluginTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 6f2c507b65..b8f7e7e2b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -24,6 +24,7 @@ import java.util.Properties; import java.util.Set; import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; @@ -1270,6 +1271,11 @@ public interface Configuration { */ List getBrokerCriticalPlugins(); + /** + * @return + */ + List getBrokerFederationPlugins(); + /** * @return */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8fc7262d12..dd3be11938 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -41,20 +41,6 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; -import org.apache.activemq.artemis.core.config.FederationConfiguration; -import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; -import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; @@ -67,21 +53,36 @@ import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.config.FederationConfiguration; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.jboss.logging.Logger; @@ -277,6 +278,7 @@ public class ConfigurationImpl implements Configuration, Serializable { private final List brokerMessagePlugins = new CopyOnWriteArrayList<>(); private final List brokerBridgePlugins = new CopyOnWriteArrayList<>(); private final List brokerCriticalPlugins = new CopyOnWriteArrayList<>(); + private final List brokerFederationPlugins = new CopyOnWriteArrayList<>(); private Map> securityRoleNameMappings = new HashMap<>(); @@ -1494,6 +1496,9 @@ public class ConfigurationImpl implements Configuration, Serializable { if (plugin instanceof ActiveMQServerCriticalPlugin) { brokerCriticalPlugins.add((ActiveMQServerCriticalPlugin) plugin); } + if (plugin instanceof ActiveMQServerFederationPlugin) { + brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin); + } } @Override @@ -1526,6 +1531,9 @@ public class ConfigurationImpl implements Configuration, Serializable { if (plugin instanceof ActiveMQServerCriticalPlugin) { brokerCriticalPlugins.remove(plugin); } + if (plugin instanceof ActiveMQServerFederationPlugin) { + brokerFederationPlugins.remove(plugin); + } } @Override @@ -1578,6 +1586,11 @@ public class ConfigurationImpl implements Configuration, Serializable { return brokerCriticalPlugins; } + @Override + public List getBrokerFederationPlugins() { + return brokerFederationPlugins; + } + @Override public List getFederationConfigurations() { return federationConfigurations; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index d8541dd5b3..23d5c055b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ConnectorsService; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -256,6 +257,8 @@ public interface ActiveMQServer extends ServiceComponent { List getBrokerCriticalPlugins(); + List getBrokerFederationPlugins(); + void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; void callBrokerConnectionPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; @@ -276,6 +279,8 @@ public interface ActiveMQServer extends ServiceComponent { void callBrokerCriticalPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; + void callBrokerFederationPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; + boolean hasBrokerPlugins(); boolean hasBrokerConnectionPlugins(); @@ -296,6 +301,8 @@ public interface ActiveMQServer extends ServiceComponent { boolean hasBrokerCriticalPlugins(); + boolean hasBrokerFederationPlugins(); + void checkQueueCreationLimit(String username) throws Exception; ServerSession createSession(String name, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 72744ac99a..7ce15119e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1654,6 +1654,11 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void emptyAddressFile(String addressFile, String directory); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222286, value = "Error executing {0} federation plugin method.", + format = Message.Format.MESSAGE_FORMAT) + void federationPluginExecutionError(@Cause Throwable e, String pluginMethod); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/AbstractFederationStream.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/AbstractFederationStream.java new file mode 100644 index 0000000000..f9c4d12d84 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/AbstractFederationStream.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.federation; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.federation.FederationStreamConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.federation.address.FederatedAddress; +import org.apache.activemq.artemis.core.server.federation.queue.FederatedQueue; +import org.jboss.logging.Logger; + +public abstract class AbstractFederationStream implements FederationStream { + + + private static final Logger logger = Logger.getLogger(AbstractFederationStream.class); + protected final ActiveMQServer server; + protected final Federation federation; + protected final SimpleString name; + protected final FederationConnection connection; + private FederationStreamConfiguration config; + protected Map federatedQueueMap = new HashMap<>(); + protected Map federatedAddressMap = new HashMap<>(); + + + public AbstractFederationStream(ActiveMQServer server, Federation federation, String name, FederationStreamConfiguration config) { + this(server, federation, name, config, null); + } + + public AbstractFederationStream(final ActiveMQServer server, final Federation federation, final String name, final FederationStreamConfiguration config, + final FederationConnection connection) { + this.server = server; + this.federation = federation; + Objects.requireNonNull(config.getName()); + this.name = SimpleString.toSimpleString(config.getName()); + this.config = config; + this.connection = connection != null ? connection : new FederationConnection(server.getConfiguration(), name, config.getConnectionConfiguration()); + } + + @Override + public synchronized void start() { + if (connection != null) { + connection.start(); + } + } + + @Override + public synchronized void stop() { + if (connection != null) { + connection.stop(); + } + } + + @Override + public Federation getFederation() { + return federation; + } + + @Override + public FederationStreamConfiguration getConfig() { + return config; + } + + @Override + public SimpleString getName() { + return name; + } + + @Override + public FederationConnection getConnection() { + return connection; + } + + + @Override + public String getUser() { + String user = config.getConnectionConfiguration().getUsername(); + if (user == null || user.isEmpty()) { + return federation.getFederationUser(); + } else { + return user; + } + } + + @Override + public String getPassword() { + String password = config.getConnectionConfiguration().getPassword(); + if (password == null || password.isEmpty()) { + return federation.getFederationPassword(); + } else { + return password; + } + } + + @Override + public int getPriorityAdjustment() { + return config.getConnectionConfiguration().getPriorityAdjustment(); + } + + protected void callFederationStreamStartedPlugins() { + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.federationStreamStarted(this)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federationStreamStarted"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } + } + + protected void callFederationStreamStoppedPlugins() { + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.federationStreamStopped(this)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federationStreamStopped"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java index 8f2faafd32..75dd7c7ad6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java @@ -19,15 +19,21 @@ package org.apache.activemq.artemis.core.server.federation; import java.util.HashMap; import java.util.Map; + +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer.ClientSessionCallback; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl.ClientSessionCallback; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.jboss.logging.Logger; public abstract class FederatedAbstract implements ActiveMQServerBasePlugin { + private static final Logger logger = Logger.getLogger(FederatedAbstract.class); + private static final WildcardConfiguration DEFAULT_WILDCARD_CONFIGURATION = new WildcardConfiguration(); protected final Federation federation; protected ActiveMQServer server; @@ -104,9 +110,27 @@ public abstract class FederatedAbstract implements ActiveMQServerBasePlugin { if (started) { FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key); if (remoteQueueConsumer == null) { - remoteQueueConsumer = new FederatedQueueConsumer(federation, server, transformer, key, upstream, callback); + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.beforeCreateFederatedQueueConsumer(key)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCreateFederatedQueueConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } + remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback); remoteQueueConsumer.start(); remoteQueueConsumers.put(key, remoteQueueConsumer); + + if (server.hasBrokerFederationPlugins()) { + try { + final FederatedQueueConsumer finalConsumer = remoteQueueConsumer; + server.callBrokerFederationPlugins(plugin -> plugin.afterCreateFederatedQueueConsumer(finalConsumer)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCreateFederatedQueueConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } } remoteQueueConsumer.incrementCount(); } @@ -116,10 +140,26 @@ public abstract class FederatedAbstract implements ActiveMQServerBasePlugin { public synchronized void removeRemoteConsumer(FederatedConsumerKey key) { FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key); if (remoteQueueConsumer != null) { + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.beforeCloseFederatedQueueConsumer(remoteQueueConsumer)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCloseFederatedQueueConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } if (remoteQueueConsumer.decrementCount() <= 0) { remoteQueueConsumer.close(); remoteQueueConsumers.remove(key); } + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.afterCloseFederatedQueueConsumer(remoteQueueConsumer)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCloseFederatedQueueConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java index a1e08686df..17824e0023 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java @@ -17,74 +17,15 @@ package org.apache.activemq.artemis.core.server.federation; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.client.ClientConsumer; -import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.MessageHandler; -import org.apache.activemq.artemis.api.core.client.SessionFailureListener; -import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.transformer.Transformer; -public class FederatedQueueConsumer implements MessageHandler, SessionFailureListener { +public interface FederatedQueueConsumer extends MessageHandler { - public static final String FEDERATION_NAME = "federation-name"; - public static final String FEDERATION_UPSTREAM_NAME = "federation-upstream-name"; - private final ActiveMQServer server; - private final Federation federation; - private final FederatedConsumerKey key; - private final Transformer transformer; - private final FederationUpstream upstream; - private final AtomicInteger count = new AtomicInteger(); - private final ScheduledExecutorService scheduledExecutorService; - private final int intialConnectDelayMultiplier = 2; - private final int intialConnectDelayMax = 30; - private final ClientSessionCallback clientSessionCallback; + String FEDERATION_NAME = "federation-name"; + String FEDERATION_UPSTREAM_NAME = "federation-upstream-name"; - private ClientSessionFactoryInternal clientSessionFactory; - private ClientSession clientSession; - private ClientConsumer clientConsumer; - - public FederatedQueueConsumer(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) { - this.federation = federation; - this.server = server; - this.key = key; - this.transformer = transformer; - this.upstream = upstream; - this.scheduledExecutorService = server.getScheduledPool(); - this.clientSessionCallback = clientSessionCallback; - } - - public int incrementCount() { - return count.incrementAndGet(); - } - - public int decrementCount() { - return count.decrementAndGet(); - } - - public void start() { - scheduleConnect(0); - } - - private void scheduleConnect(int delay) { - scheduledExecutorService.schedule(() -> { - try { - connect(); - } catch (Exception e) { - scheduleConnect(getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax)); - } - }, delay, TimeUnit.SECONDS); - } - - public static int getNextDelay(int delay, int delayMultiplier, int delayMax) { + static int getNextDelay(int delay, int delayMultiplier, int delayMax) { int nextDelay; if (delay == 0) { nextDelay = 1; @@ -97,108 +38,19 @@ public class FederatedQueueConsumer implements MessageHandler, SessionFailureLis return nextDelay; } - private void connect() throws Exception { - try { - if (clientConsumer == null) { - synchronized (this) { - this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory(); - this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize()); - this.clientSession.addFailureListener(this); - this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString()); - this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString()); - this.clientSession.start(); - if (clientSessionCallback != null) { - clientSessionCallback.callback(clientSession); - } - if (clientSession.queueQuery(key.getQueueName()).isExists()) { - this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false); - this.clientConsumer.setMessageHandler(this); - } else { - throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote"); - } - } - } - } catch (Exception e) { - try { - if (clientSessionFactory != null) { - clientSessionFactory.cleanup(); - } - disconnect(); - } catch (ActiveMQException ignored) { - } - throw e; - } - } + FederationUpstream getFederationUpstream(); - public void close() { - scheduleDisconnect(0); - } + Federation getFederation(); - private void scheduleDisconnect(int delay) { - scheduledExecutorService.schedule(() -> { - try { - disconnect(); - } catch (Exception ignored) { - } - }, delay, TimeUnit.SECONDS); - } + FederatedConsumerKey getKey(); - private void disconnect() throws ActiveMQException { - if (clientConsumer != null) { - clientConsumer.close(); - } - if (clientSession != null) { - clientSession.close(); - } - clientConsumer = null; - clientSession = null; + ClientSession getClientSession(); - if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() || - clientSessionFactory.numSessions() == 0)) { - clientSessionFactory.close(); - clientSessionFactory = null; - } - } + int incrementCount(); - @Override - public void onMessage(ClientMessage clientMessage) { - try { - Message message = transformer == null ? clientMessage : transformer.transform(clientMessage); - if (message != null) { - server.getPostOffice().route(message, true); - } - clientMessage.acknowledge(); - } catch (Exception e) { - try { - clientSession.rollback(); - } catch (ActiveMQException e1) { - } - } - } + int decrementCount(); - @Override - public void connectionFailed(ActiveMQException exception, boolean failedOver) { - connectionFailed(exception, failedOver, null); - } + void start(); - @Override - public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { - try { - clientSessionFactory.cleanup(); - clientSessionFactory.close(); - clientConsumer = null; - clientSession = null; - clientSessionFactory = null; - } catch (Throwable dontCare) { - } - start(); - } - - @Override - public void beforeReconnect(ActiveMQException exception) { - } - - public interface ClientSessionCallback { - void callback(ClientSession clientSession) throws ActiveMQException; - } -} \ No newline at end of file + void close(); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java new file mode 100644 index 0000000000..0f408a1dd2 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.federation; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.jboss.logging.Logger; + +public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener { + + private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class); + private final ActiveMQServer server; + private final Federation federation; + private final FederatedConsumerKey key; + private final Transformer transformer; + private final FederationUpstream upstream; + private final AtomicInteger count = new AtomicInteger(); + private final ScheduledExecutorService scheduledExecutorService; + private final int intialConnectDelayMultiplier = 2; + private final int intialConnectDelayMax = 30; + private final ClientSessionCallback clientSessionCallback; + + private ClientSessionFactoryInternal clientSessionFactory; + private ClientSession clientSession; + private ClientConsumer clientConsumer; + + public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) { + this.federation = federation; + this.server = server; + this.key = key; + this.transformer = transformer; + this.upstream = upstream; + this.scheduledExecutorService = server.getScheduledPool(); + this.clientSessionCallback = clientSessionCallback; + } + + @Override + public FederationUpstream getFederationUpstream() { + return upstream; + } + + @Override + public Federation getFederation() { + return federation; + } + + @Override + public FederatedConsumerKey getKey() { + return key; + } + + @Override + public ClientSession getClientSession() { + return clientSession; + } + + @Override + public int incrementCount() { + return count.incrementAndGet(); + } + + @Override + public int decrementCount() { + return count.decrementAndGet(); + } + + @Override + public void start() { + scheduleConnect(0); + } + + private void scheduleConnect(int delay) { + scheduledExecutorService.schedule(() -> { + try { + connect(); + } catch (Exception e) { + scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax)); + } + }, delay, TimeUnit.SECONDS); + } + + private void connect() throws Exception { + try { + if (clientConsumer == null) { + synchronized (this) { + this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory(); + this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize()); + this.clientSession.addFailureListener(this); + this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString()); + this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString()); + this.clientSession.start(); + if (clientSessionCallback != null) { + clientSessionCallback.callback(clientSession); + } + if (clientSession.queueQuery(key.getQueueName()).isExists()) { + this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false); + this.clientConsumer.setMessageHandler(this); + } else { + throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote"); + } + } + } + } catch (Exception e) { + try { + if (clientSessionFactory != null) { + clientSessionFactory.cleanup(); + } + disconnect(); + } catch (ActiveMQException ignored) { + } + throw e; + } + } + + @Override + public void close() { + scheduleDisconnect(0); + } + + private void scheduleDisconnect(int delay) { + scheduledExecutorService.schedule(() -> { + try { + disconnect(); + } catch (Exception ignored) { + } + }, delay, TimeUnit.SECONDS); + } + + private void disconnect() throws ActiveMQException { + if (clientConsumer != null) { + clientConsumer.close(); + } + if (clientSession != null) { + clientSession.close(); + } + clientConsumer = null; + clientSession = null; + + if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() || + clientSessionFactory.numSessions() == 0)) { + clientSessionFactory.close(); + clientSessionFactory = null; + } + } + + @Override + public void onMessage(ClientMessage clientMessage) { + try { + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } + + Message message = transformer == null ? clientMessage : transformer.transform(clientMessage); + if (message != null) { + server.getPostOffice().route(message, true); + } + clientMessage.acknowledge(); + + if (server.hasBrokerFederationPlugins()) { + try { + server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage)); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + } + } catch (Exception e) { + try { + clientSession.rollback(); + } catch (ActiveMQException e1) { + } + } + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + connectionFailed(exception, failedOver, null); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + try { + clientSessionFactory.cleanup(); + clientSessionFactory.close(); + clientConsumer = null; + clientSession = null; + clientSessionFactory = null; + } catch (Throwable dontCare) { + } + start(); + } + + @Override + public void beforeReconnect(ActiveMQException exception) { + } + + public interface ClientSessionCallback { + void callback(ClientSession clientSession) throws ActiveMQException; + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationDownstream.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationDownstream.java index 2e77c861d7..ebaf21ee32 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationDownstream.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationDownstream.java @@ -38,7 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.jboss.logging.Logger; -public class FederationDownstream extends FederationStream implements SessionFailureListener { +public class FederationDownstream extends AbstractFederationStream implements SessionFailureListener { private static final Logger logger = Logger.getLogger(FederationDownstream.class); @@ -65,6 +65,7 @@ public class FederationDownstream extends FederationStream implements SessionFai @Override public synchronized void start() { super.start(); + callFederationStreamStartedPlugins(); try { deploy(federationConfiguration); } catch (ActiveMQException e) { @@ -75,6 +76,7 @@ public class FederationDownstream extends FederationStream implements SessionFai @Override public synchronized void stop() { super.stop(); + callFederationStreamStoppedPlugins(); } public void deploy(FederationConfiguration federationConfiguration) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationStream.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationStream.java index e983ba86dc..af805ea667 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationStream.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationStream.java @@ -17,93 +17,26 @@ package org.apache.activemq.artemis.core.server.federation; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.federation.FederationStreamConfiguration; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.federation.address.FederatedAddress; -import org.apache.activemq.artemis.core.server.federation.queue.FederatedQueue; -import org.jboss.logging.Logger; -public abstract class FederationStream { +public interface FederationStream { + void start(); - private static final Logger logger = Logger.getLogger(FederationStream.class); - protected final ActiveMQServer server; - protected final Federation federation; - protected final SimpleString name; - protected final FederationConnection connection; - private FederationStreamConfiguration config; - protected Map federatedQueueMap = new HashMap<>(); - protected Map federatedAddressMap = new HashMap<>(); + void stop(); + Federation getFederation(); - public FederationStream(ActiveMQServer server, Federation federation, String name, FederationStreamConfiguration config) { - this(server, federation, name, config, null); - } + FederationStreamConfiguration getConfig(); - public FederationStream(final ActiveMQServer server, final Federation federation, final String name, final FederationStreamConfiguration config, - final FederationConnection connection) { - this.server = server; - this.federation = federation; - Objects.requireNonNull(config.getName()); - this.name = SimpleString.toSimpleString(config.getName()); - this.config = config; - this.connection = connection != null ? connection : new FederationConnection(server.getConfiguration(), name, config.getConnectionConfiguration()); - } + SimpleString getName(); - public synchronized void start() { - if (connection != null) { - connection.start(); - } - } + FederationConnection getConnection(); - public synchronized void stop() { - if (connection != null) { - connection.stop(); - } - } + String getUser(); - public Federation getFederation() { - return federation; - } - - public FederationStreamConfiguration getConfig() { - return config; - } - - public SimpleString getName() { - return name; - } - - public FederationConnection getConnection() { - return connection; - } - - - public String getUser() { - String user = config.getConnectionConfiguration().getUsername(); - if (user == null || user.isEmpty()) { - return federation.getFederationUser(); - } else { - return user; - } - } - - public String getPassword() { - String password = config.getConnectionConfiguration().getPassword(); - if (password == null || password.isEmpty()) { - return federation.getFederationPassword(); - } else { - return password; - } - } - - public int getPriorityAdjustment() { - return config.getConnectionConfiguration().getPriorityAdjustment(); - } + String getPassword(); + int getPriorityAdjustment(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationUpstream.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationUpstream.java index 38663e457d..83a6517bb8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationUpstream.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationUpstream.java @@ -31,7 +31,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.federation.address.FederatedAddress; import org.apache.activemq.artemis.core.server.federation.queue.FederatedQueue; -public class FederationUpstream extends FederationStream { +public class FederationUpstream extends AbstractFederationStream { private FederationUpstreamConfiguration config; public FederationUpstream(ActiveMQServer server, Federation federation, String name, FederationUpstreamConfiguration config) { @@ -49,6 +49,8 @@ public class FederationUpstream extends FederationStream { for (FederatedAddress federatedAddress : federatedAddressMap.values()) { federatedAddress.start(); } + + callFederationStreamStartedPlugins(); } @Override @@ -64,6 +66,8 @@ public class FederationUpstream extends FederationStream { federatedQueueMap.clear(); super.stop(); + + callFederationStreamStoppedPlugins(); } public void deploy(Set policyRefsToDeploy, Map policyMap) throws ActiveMQException { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java index 329cbdc0af..9a10cdeae4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.config.federation.FederationAddressPolic import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.federation.FederatedAbstract; import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; @@ -45,6 +47,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.Match; import org.apache.activemq.artemis.utils.ByteUtil; +import org.jboss.logging.Logger; /** * Federated Address, replicate messages from the remote brokers address to itself. @@ -58,7 +61,9 @@ import org.apache.activemq.artemis.utils.ByteUtil; */ public class FederatedAddress extends FederatedAbstract implements ActiveMQServerQueuePlugin, Serializable { + private static final Logger logger = Logger.getLogger(FederatedAddress.class); public static final String FEDERATED_QUEUE_PREFIX = "federated"; + public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops"); private final SimpleString queueNameFormat; private final SimpleString filterString; @@ -105,7 +110,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe .stream() .filter(b -> b instanceof QueueBinding) .map(b -> ((QueueBinding) b).getQueue()) - .forEach(this::createRemoteConsumer); + .forEach(this::conditionalCreateRemoteConsumer); } /** @@ -115,6 +120,24 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe */ @Override public synchronized void afterCreateQueue(Queue queue) { + conditionalCreateRemoteConsumer(queue); + } + + private void conditionalCreateRemoteConsumer(Queue queue) { + if (server.hasBrokerFederationPlugins()) { + final AtomicBoolean conditionalCreate = new AtomicBoolean(true); + try { + server.callBrokerFederationPlugins(plugin -> { + conditionalCreate.set(conditionalCreate.get() && plugin.federatedAddressConditionalCreateConsumer(queue)); + }); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedAddressConditionalCreateConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + if (!conditionalCreate.get()) { + return; + } + } createRemoteConsumer(queue); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java index fcecf9c92b..79536251ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.config.WildcardConfiguration; @@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; @@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.federation.FederationUpstream; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.Match; +import org.jboss.logging.Logger; /** * Federated Queue, connect to upstream queues routing them to the local queue when a local consumer exist. @@ -51,6 +54,8 @@ import org.apache.activemq.artemis.core.settings.impl.Match; */ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerConsumerPlugin, Serializable { + private static final Logger logger = Logger.getLogger(FederatedQueue.class); + private final Set includes; private final Set excludes; private final Filter metaDataFilter; @@ -93,7 +98,7 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC .stream() .filter(b -> b instanceof QueueBinding) .map(b -> (QueueBinding) b) - .forEach(b -> createRemoteConsumer(b.getQueue())); + .forEach(b -> conditionalCreateRemoteConsumer(b.getQueue())); } /** @@ -103,18 +108,36 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC */ @Override public synchronized void afterCreateConsumer(ServerConsumer consumer) { - createRemoteConsumer(consumer); + conditionalCreateRemoteConsumer(consumer); } public FederationQueuePolicyConfiguration getConfig() { return config; } - private void createRemoteConsumer(Queue queue) { + private void conditionalCreateRemoteConsumer(ServerConsumer consumer) { + if (server.hasBrokerFederationPlugins()) { + final AtomicBoolean conditionalCreate = new AtomicBoolean(true); + try { + server.callBrokerFederationPlugins(plugin -> { + conditionalCreate.set(conditionalCreate.get() && plugin.federatedQueueConditionalCreateConsumer(consumer)); + }); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedQueueConditionalCreateConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + if (!conditionalCreate.get()) { + return; + } + } + createRemoteConsumer(consumer); + } + + private void conditionalCreateRemoteConsumer(Queue queue) { queue.getConsumers() .stream() .filter(consumer -> consumer instanceof ServerConsumer) - .map(c -> (ServerConsumer) c).forEach(this::createRemoteConsumer); + .map(c -> (ServerConsumer) c).forEach(this::conditionalCreateRemoteConsumer); } private void createRemoteConsumer(ServerConsumer consumer) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 71a9101518..f1e79c2c4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -156,6 +156,7 @@ import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -2330,6 +2331,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { return configuration.getBrokerCriticalPlugins(); } + @Override + public List getBrokerFederationPlugins() { + return configuration.getBrokerFederationPlugins(); + } + @Override public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException { callBrokerPlugins(getBrokerPlugins(), pluginRun); @@ -2380,6 +2386,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { callBrokerPlugins(getBrokerCriticalPlugins(), pluginRun); } + @Override + public void callBrokerFederationPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerFederationPlugins(), pluginRun); + } + private

void callBrokerPlugins(final List

plugins, final ActiveMQPluginRunnable

pluginRun) throws ActiveMQException { if (pluginRun != null) { for (P plugin : plugins) { @@ -2447,6 +2458,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { return !getBrokerCriticalPlugins().isEmpty(); } + @Override + public boolean hasBrokerFederationPlugins() { + return !getBrokerFederationPlugins().isEmpty(); + } + @Override public ExecutorFactory getExecutorFactory() { return executorFactory; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java new file mode 100644 index 0000000000..8e27693048 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.FederationStream; + +public interface ActiveMQServerFederationPlugin extends ActiveMQServerBasePlugin { + + /** + * After a federation stream has been started + * + * @param stream + * @throws ActiveMQException + */ + default void federationStreamStarted(final FederationStream stream) throws ActiveMQException { + + } + + /** + * After a federation stream has been stopped + * + * @param stream + * @throws ActiveMQException + */ + default void federationStreamStopped(final FederationStream stream) throws ActiveMQException { + + } + + /** + * Before a federated queue consumer is created + * + * @param key + * @throws ActiveMQException + */ + default void beforeCreateFederatedQueueConsumer(final FederatedConsumerKey key) throws ActiveMQException { + + } + + /** + * After a federated queue consumer is created + * + * @param consumer + * @throws ActiveMQException + */ + default void afterCreateFederatedQueueConsumer(final FederatedQueueConsumer consumer) throws ActiveMQException { + + } + + /** + * Before a federated queue consumer is closed + * + * @param consumer + * @throws ActiveMQException + */ + default void beforeCloseFederatedQueueConsumer(final FederatedQueueConsumer consumer) throws ActiveMQException { + + } + + /** + * After a federated queue consumer is closed + * + * @param consumer + * @throws ActiveMQException + */ + default void afterCloseFederatedQueueConsumer(final FederatedQueueConsumer consumer) throws ActiveMQException { + + } + + /** + * Before a federated queue consumer handles a message + * + * @param consumer + * @param message + * @throws ActiveMQException + */ + default void beforeFederatedQueueConsumerMessageHandled(final FederatedQueueConsumer consumer, Message message) throws ActiveMQException { + + } + + /** + * After a federated queue consumer handles a message + * + * @param consumer + * @param message + * @throws ActiveMQException + */ + default void afterFederatedQueueConsumerMessageHandled(final FederatedQueueConsumer consumer, Message message) throws ActiveMQException { + + } + + /** + * Conditionally create a federated queue consumer for a federated address. This allows custom + * logic to be inserted to decide when to create federated queue consumers + * + * @param queue + * @return if true, create the consumer, else if false don't create + * @throws ActiveMQException + */ + default boolean federatedAddressConditionalCreateConsumer(final Queue queue) throws ActiveMQException { + return true; + } + + /** + * Conditionally create a federated queue consumer for a federated queue. This allows custom + * logic to be inserted to decide when to create federated queue consumers + * + * @param consumer + * @return true, create the consumer, else if false don't create + * @throws ActiveMQException + */ + default boolean federatedQueueConditionalCreateConsumer(final ServerConsumer consumer) throws ActiveMQException { + return true; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index 426001129a..38f7ad2fcc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -30,5 +30,6 @@ public interface ActiveMQServerPlugin extends ActiveMQServerBindingPlugin, ActiveMQServerMessagePlugin, ActiveMQServerBridgePlugin, - ActiveMQServerCriticalPlugin { + ActiveMQServerCriticalPlugin, + ActiveMQServerFederationPlugin { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java index c0f9e1c6df..eea8925de3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java @@ -24,16 +24,9 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; -import java.util.Collections; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.FederationConfiguration; -import org.apache.activemq.artemis.core.config.TransformerConfiguration; -import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration; -import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration; -import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration; -import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.Wait; @@ -60,12 +53,12 @@ public class FederatedAddressTest extends FederatedTestBase { public void testDownstreamFederatedAddressReplication() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server1", address, + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server1", address, getServer(0).getConfiguration().getTransportConfigurations("server0")[0]); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); - FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address, + FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address, getServer(1).getConfiguration().getTransportConfigurations("server1")[0]); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2); getServer(1).getFederationManager().deploy(); @@ -77,12 +70,12 @@ public class FederatedAddressTest extends FederatedTestBase { public void testDownstreamFederatedAddressReplicationRef() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server1", address, + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server1", address, "server0"); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); - FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address, + FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address, "server1"); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2); getServer(1).getFederationManager().deploy(); @@ -94,7 +87,7 @@ public class FederatedAddressTest extends FederatedTestBase { public void testDownstreamFederatedAddressReplicationRefOneWay() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address, + FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address, "server1"); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2); getServer(1).getFederationManager().deploy(); @@ -106,11 +99,11 @@ public class FederatedAddressTest extends FederatedTestBase { public void testUpstreamFederatedAddressReplication() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address); + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); - FederationConfiguration federationConfiguration2 = createUpstreamFederationConfiguration("server0", address); + FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server0", address); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2); getServer(1).getFederationManager().deploy(); @@ -121,8 +114,8 @@ public class FederatedAddressTest extends FederatedTestBase { public void testDownstreamFederatedAddressReplicationRefOneWayTransformer() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address, "server1"); - addTransformerConfiguration(federationConfiguration2, address); + FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address, "server1"); + FederatedTestUtil.addAddressTransformerConfiguration(federationConfiguration2, address); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2); getServer(1).getFederationManager().deploy(); @@ -158,7 +151,7 @@ public class FederatedAddressTest extends FederatedTestBase { public void testUpstreamFederatedAddressReplicationOneWay() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address); + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -169,8 +162,8 @@ public class FederatedAddressTest extends FederatedTestBase { public void testUpstreamFederatedAddressReplicationOneWayTransformer() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address); - addTransformerConfiguration(federationConfiguration, address); + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); + FederatedTestUtil.addAddressTransformerConfiguration(federationConfiguration, address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -231,7 +224,6 @@ public class FederatedAddressTest extends FederatedTestBase { } - @Test public void testFederatedAddressDeployAfterQueuesExist() throws Exception { String address = getName(); @@ -257,7 +249,7 @@ public class FederatedAddressTest extends FederatedTestBase { assertNull(consumer0.receive(100)); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address); + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -275,7 +267,7 @@ public class FederatedAddressTest extends FederatedTestBase { public void testFederatedAddressRemoteBrokerRestart() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address); + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -333,7 +325,7 @@ public class FederatedAddressTest extends FederatedTestBase { public void testFederatedAddressLocalBrokerRestart() throws Exception { String address = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address); + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -398,12 +390,12 @@ public class FederatedAddressTest extends FederatedTestBase { // } //Connect broker 0 (consumer will be here at end of chain) to broker 1 - FederationConfiguration federationConfiguration0 = createUpstreamFederationConfiguration("server1", address, 2); + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address, 2); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); //Connect broker 1 (middle of chain) to broker 2 - FederationConfiguration federationConfiguration1 = createUpstreamFederationConfiguration("server2", address, 2); + FederationConfiguration federationConfiguration1 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server2", address, 2); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1); getServer(1).getFederationManager().deploy(); //Broker 2 we dont setup any federation as he is the upstream (head of the chain) @@ -434,80 +426,6 @@ public class FederatedAddressTest extends FederatedTestBase { } } - private FederationConfiguration createFederationConfiguration(String address, int hops) { - FederationAddressPolicyConfiguration addressPolicyConfiguration = new FederationAddressPolicyConfiguration(); - addressPolicyConfiguration.setName( "AddressPolicy" + address); - addressPolicyConfiguration.addInclude(new FederationAddressPolicyConfiguration.Matcher().setAddressMatch(address)); - addressPolicyConfiguration.setMaxHops(hops); - - FederationConfiguration federationConfiguration = new FederationConfiguration(); - federationConfiguration.setName("default"); - federationConfiguration.addFederationPolicy(addressPolicyConfiguration); - - return federationConfiguration; - } - - private FederationConfiguration createUpstreamFederationConfiguration(String connector, String address, int hops) { - FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration(); - upstreamConfiguration.setName(connector); - upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); - upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); - upstreamConfiguration.addPolicyRef("AddressPolicy" + address); - - FederationConfiguration federationConfiguration = createFederationConfiguration(address, hops); - federationConfiguration.addUpstreamConfiguration(upstreamConfiguration); - - return federationConfiguration; - } - private FederationConfiguration createUpstreamFederationConfiguration(String connector, String address) { - return createUpstreamFederationConfiguration(connector, address, 1); - } - - private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration) { - return createDownstreamFederationConfiguration(connector, address, transportConfiguration, 1); - } - - private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration, - int hops) { - FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration(); - downstreamConfiguration.setName(connector); - downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); - downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); - downstreamConfiguration.addPolicyRef("AddressPolicy" + address); - downstreamConfiguration.setUpstreamConfiguration(transportConfiguration); - - FederationConfiguration federationConfiguration = createFederationConfiguration(address, hops); - federationConfiguration.addDownstreamConfiguration(downstreamConfiguration); - - return federationConfiguration; - } - - private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef, - int hops) { - FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration(); - downstreamConfiguration.setName(connector); - downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); - downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); - downstreamConfiguration.addPolicyRef("AddressPolicy" + address); - downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef); - - FederationConfiguration federationConfiguration = createFederationConfiguration(address, hops); - federationConfiguration.addDownstreamConfiguration(downstreamConfiguration); - - return federationConfiguration; - } - - private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef) { - return createDownstreamFederationConfiguration(connector, address, transportConfigurationRef, 1); - } - - private void addTransformerConfiguration(final FederationConfiguration federationConfiguration, final String address) { - federationConfiguration.addTransformerConfiguration( - new FederationTransformerConfiguration("transformer", new TransformerConfiguration(TestTransformer.class.getName()))); - FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address); - policy.setTransformerRef("transformer"); - } - private Message createTextMessage(Session session1, String group) throws JMSException { Message message = session1.createTextMessage("hello"); message.setStringProperty("JMSXGroupID", group); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java index 16af3b5555..7dcd81e621 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java @@ -25,15 +25,11 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.Collections; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.FederationConfiguration; -import org.apache.activemq.artemis.core.config.TransformerConfiguration; -import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; -import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.transformer.Transformer; @@ -63,7 +59,7 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteConsumeUpstream() throws Exception { String queueName = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -74,7 +70,7 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception { String queueName = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName); //Favor federated broker over local consumers policy.setPriorityAdjustment(1); @@ -89,7 +85,7 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteConsumeDownstreamPriorityAdjustment() throws Exception { String queueName = getName(); - FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server0", queueName, "server1"); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1"); FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName); //Favor federated broker over local consumers policy.setPriorityAdjustment(1); @@ -157,8 +153,8 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteConsumeUpstreamTransformer() throws Exception { String queueName = getName(); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName); - addTransformerConfiguration(federationConfiguration, queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); + FederatedTestUtil.addQueueTransformerConfiguration(federationConfiguration, queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -169,7 +165,7 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteConsumeDownstream() throws Exception { String queueName = getName(); - FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server0", queueName, "server1"); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1"); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(1).getFederationManager().deploy(); @@ -180,8 +176,8 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteConsumeDownstreamTransformer() throws Exception { String queueName = getName(); - FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server0", queueName, "server1"); - addTransformerConfiguration(federationConfiguration, queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1"); + FederatedTestUtil.addQueueTransformerConfiguration(federationConfiguration, queueName); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(1).getFederationManager().deploy(); @@ -256,7 +252,7 @@ public class FederatedQueueTest extends FederatedTestBase { assertNull(consumer0.receiveNoWait()); - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -273,11 +269,11 @@ public class FederatedQueueTest extends FederatedTestBase { for (int i = 0; i < 2; i++) { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration0 = createUpstreamFederationConfiguration("server1", queueName); + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); - FederationConfiguration federationConfiguration1 = createUpstreamFederationConfiguration("server0", queueName); + FederationConfiguration federationConfiguration1 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server0", queueName); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1); getServer(1).getFederationManager().deploy(); @@ -291,11 +287,11 @@ public class FederatedQueueTest extends FederatedTestBase { for (int i = 0; i < 2; i++) { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1", queueName, "server0"); + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1", queueName, "server0"); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); - FederationConfiguration federationConfiguration1 = createDownstreamFederationConfiguration("server0", queueName, "server1"); + FederationConfiguration federationConfiguration1 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1"); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1); getServer(1).getFederationManager().deploy(); @@ -310,9 +306,9 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream", + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, false, "server0"); - FederationUpstreamConfiguration upstreamConfig = createFederationUpstream("server1", queueName); + FederationUpstreamConfiguration upstreamConfig = FederatedTestUtil.createQueueFederationUpstream("server1", queueName); federationConfiguration0.addUpstreamConfiguration(upstreamConfig); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); @@ -328,9 +324,9 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream", + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, true, "server0"); - FederationUpstreamConfiguration upstreamConfig = createFederationUpstream("server1", queueName); + FederationUpstreamConfiguration upstreamConfig = FederatedTestUtil.createQueueFederationUpstream("server1", queueName); upstreamConfig.getConnectionConfiguration().setShareConnection(true); federationConfiguration0.addUpstreamConfiguration(upstreamConfig); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); @@ -347,9 +343,9 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream", + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, false, "server0"); - federationConfiguration0.addUpstreamConfiguration(createFederationUpstream("server1", queueName)); + federationConfiguration0.addUpstreamConfiguration(FederatedTestUtil.createQueueFederationUpstream("server1", queueName)); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); @@ -364,9 +360,9 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream", + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, true, "server0"); - FederationUpstreamConfiguration upstreamConfiguration = createFederationUpstream("server1", queueName); + FederationUpstreamConfiguration upstreamConfiguration = FederatedTestUtil.createQueueFederationUpstream("server1", queueName); upstreamConfiguration.getConnectionConfiguration().setShareConnection(true); federationConfiguration0.addUpstreamConfiguration(upstreamConfiguration); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); @@ -469,12 +465,12 @@ public class FederatedQueueTest extends FederatedTestBase { } //Connect broker 0 (consumer will be here at end of chain) to broker 1 - FederationConfiguration federationConfiguration0 = createUpstreamFederationConfiguration("server1", queueName, true); + FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName, true); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); //Connect broker 1 (middle of chain) to broker 2 - FederationConfiguration federationConfiguration1 = createUpstreamFederationConfiguration("server2", queueName, true); + FederationConfiguration federationConfiguration1 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server2", queueName, true); getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1); getServer(1).getFederationManager().deploy(); //Broker 2 we dont setup any federation as he is the upstream (head of the chain) @@ -511,7 +507,7 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -566,7 +562,7 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); } - FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName); + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -615,79 +611,6 @@ public class FederatedQueueTest extends FederatedTestBase { assertNotNull(consumer0.receive(1000)); } - private FederationConfiguration createDownstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated, - String transportConfigurationRef) { - return createDownstreamFederationConfiguration(null, connector, queueName, includeFederated, false, transportConfigurationRef); - } - - private FederationConfiguration createDownstreamFederationConfiguration(String name, String connector, String queueName, Boolean includeFederated, - boolean shareConnection, String transportConfigurationRef) { - FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration(); - downstreamConfiguration.setName(name != null ? name : connector); - downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); - downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); - downstreamConfiguration.getConnectionConfiguration().setShareConnection(shareConnection); - downstreamConfiguration.addPolicyRef("QueuePolicy" + queueName); - downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef); - - FederationConfiguration federationConfiguration = createFederationConfiguration(connector, queueName, includeFederated); - federationConfiguration.addDownstreamConfiguration(downstreamConfiguration); - - return federationConfiguration; - } - - private FederationConfiguration createDownstreamFederationConfiguration(String connector, String queueName, String transportConfigurationRef) { - return createDownstreamFederationConfiguration(null, connector, queueName, null, false, transportConfigurationRef); - } - - private FederationConfiguration createUpstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated) { - FederationUpstreamConfiguration upstreamConfiguration = createFederationUpstream(connector, queueName); - - FederationConfiguration federationConfiguration = createFederationConfiguration(connector, queueName, includeFederated); - federationConfiguration.addUpstreamConfiguration(upstreamConfiguration); - - return federationConfiguration; - } - - private FederationUpstreamConfiguration createFederationUpstream(String connector, String queueName) { - - FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration(); - upstreamConfiguration.setName("server1-upstream"); - upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); - upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); - upstreamConfiguration.addPolicyRef("QueuePolicy" + queueName); - - return upstreamConfiguration; - } - - private FederationConfiguration createUpstreamFederationConfiguration(String connector, String queueName) { - return createUpstreamFederationConfiguration(connector, queueName, null); - } - - private FederationConfiguration createFederationConfiguration(String connector, String queueName, Boolean includeFederated) { - - FederationQueuePolicyConfiguration queuePolicyConfiguration = new FederationQueuePolicyConfiguration(); - queuePolicyConfiguration.setName( "QueuePolicy" + queueName); - queuePolicyConfiguration.addInclude(new FederationQueuePolicyConfiguration.Matcher() - .setQueueMatch(queueName).setAddressMatch("#")); - if (includeFederated != null) { - queuePolicyConfiguration.setIncludeFederated(includeFederated); - } - - FederationConfiguration federationConfiguration = new FederationConfiguration(); - federationConfiguration.setName("default"); - federationConfiguration.addFederationPolicy(queuePolicyConfiguration); - - return federationConfiguration; - } - - private void addTransformerConfiguration(final FederationConfiguration federationConfiguration, final String queueName) { - federationConfiguration.addTransformerConfiguration( - new FederationTransformerConfiguration("transformer", new TransformerConfiguration(TestTransformer.class.getName()))); - FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName); - policy.setTransformerRef("transformer"); - } - private Message createTextMessage(Session session1, String group) throws JMSException { Message message = session1.createTextMessage("hello"); message.setStringProperty("JMSXGroupID", group); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestUtil.java new file mode 100644 index 0000000000..5ee2ae1031 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestUtil.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.federation; + +import java.util.Collections; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.config.TransformerConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration; + +public class FederatedTestUtil { + + public static FederationConfiguration createAddressFederationConfiguration(String address, int hops) { + FederationAddressPolicyConfiguration addressPolicyConfiguration = new FederationAddressPolicyConfiguration(); + addressPolicyConfiguration.setName( "AddressPolicy" + address); + addressPolicyConfiguration.addInclude(new FederationAddressPolicyConfiguration.Matcher().setAddressMatch(address)); + addressPolicyConfiguration.setMaxHops(hops); + + FederationConfiguration federationConfiguration = new FederationConfiguration(); + federationConfiguration.setName("default"); + federationConfiguration.addFederationPolicy(addressPolicyConfiguration); + + return federationConfiguration; + } + + public static FederationConfiguration createAddressUpstreamFederationConfiguration(String connector, String address, int hops) { + FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration(); + upstreamConfiguration.setName(connector); + upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); + upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); + upstreamConfiguration.addPolicyRef("AddressPolicy" + address); + + FederationConfiguration federationConfiguration = createAddressFederationConfiguration(address, hops); + federationConfiguration.addUpstreamConfiguration(upstreamConfiguration); + + return federationConfiguration; + } + + public static FederationConfiguration createAddressUpstreamFederationConfiguration(String connector, String address) { + return createAddressUpstreamFederationConfiguration(connector, address, 1); + } + + public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration) { + return createAddressDownstreamFederationConfiguration(connector, address, transportConfiguration, 1); + } + + public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration, + int hops) { + FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration(); + downstreamConfiguration.setName(connector); + downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); + downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); + downstreamConfiguration.addPolicyRef("AddressPolicy" + address); + downstreamConfiguration.setUpstreamConfiguration(transportConfiguration); + + FederationConfiguration federationConfiguration = createAddressFederationConfiguration(address, hops); + federationConfiguration.addDownstreamConfiguration(downstreamConfiguration); + + return federationConfiguration; + } + + public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef, + int hops) { + FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration(); + downstreamConfiguration.setName(connector); + downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); + downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); + downstreamConfiguration.addPolicyRef("AddressPolicy" + address); + downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef); + + FederationConfiguration federationConfiguration = createAddressFederationConfiguration(address, hops); + federationConfiguration.addDownstreamConfiguration(downstreamConfiguration); + + return federationConfiguration; + } + + public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef) { + return createAddressDownstreamFederationConfiguration(connector, address, transportConfigurationRef, 1); + } + + public static void addAddressTransformerConfiguration(final FederationConfiguration federationConfiguration, final String address) { + federationConfiguration.addTransformerConfiguration( + new FederationTransformerConfiguration("transformer", new TransformerConfiguration(FederatedAddressTest.TestTransformer.class.getName()))); + FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address); + policy.setTransformerRef("transformer"); + } + + public static FederationConfiguration createDownstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated, + String transportConfigurationRef) { + return createQueueDownstreamFederationConfiguration(null, connector, queueName, includeFederated, false, transportConfigurationRef); + } + + public static FederationConfiguration createQueueDownstreamFederationConfiguration(String name, String connector, String queueName, Boolean includeFederated, + boolean shareConnection, String transportConfigurationRef) { + FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration(); + downstreamConfiguration.setName(name != null ? name : connector); + downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); + downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); + downstreamConfiguration.getConnectionConfiguration().setShareConnection(shareConnection); + downstreamConfiguration.addPolicyRef("QueuePolicy" + queueName); + downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef); + + FederationConfiguration federationConfiguration = createQueueFederationConfiguration(connector, queueName, includeFederated); + federationConfiguration.addDownstreamConfiguration(downstreamConfiguration); + + return federationConfiguration; + } + + public static FederationConfiguration createQueueDownstreamFederationConfiguration(String connector, String queueName, String transportConfigurationRef) { + return createQueueDownstreamFederationConfiguration(null, connector, queueName, null, false, transportConfigurationRef); + } + + public static FederationConfiguration createQueueUpstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated) { + FederationUpstreamConfiguration upstreamConfiguration = createQueueFederationUpstream(connector, queueName); + + FederationConfiguration federationConfiguration = createQueueFederationConfiguration(connector, queueName, includeFederated); + federationConfiguration.addUpstreamConfiguration(upstreamConfiguration); + + return federationConfiguration; + } + + public static FederationUpstreamConfiguration createQueueFederationUpstream(String connector, String queueName) { + + FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration(); + upstreamConfiguration.setName("server1-upstream"); + upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector)); + upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1); + upstreamConfiguration.addPolicyRef("QueuePolicy" + queueName); + + return upstreamConfiguration; + } + + public static FederationConfiguration createQueueUpstreamFederationConfiguration(String connector, String queueName) { + return createQueueUpstreamFederationConfiguration(connector, queueName, null); + } + + public static FederationConfiguration createQueueFederationConfiguration(String connector, String queueName, Boolean includeFederated) { + + FederationQueuePolicyConfiguration queuePolicyConfiguration = new FederationQueuePolicyConfiguration(); + queuePolicyConfiguration.setName( "QueuePolicy" + queueName); + queuePolicyConfiguration.addInclude(new FederationQueuePolicyConfiguration.Matcher() + .setQueueMatch(queueName).setAddressMatch("#")); + if (includeFederated != null) { + queuePolicyConfiguration.setIncludeFederated(includeFederated); + } + + FederationConfiguration federationConfiguration = new FederationConfiguration(); + federationConfiguration.setName("default"); + federationConfiguration.addFederationPolicy(queuePolicyConfiguration); + + return federationConfiguration; + } + + public static void addQueueTransformerConfiguration(final FederationConfiguration federationConfiguration, final String queueName) { + federationConfiguration.addTransformerConfiguration( + new FederationTransformerConfiguration("transformer", new TransformerConfiguration(FederatedQueueTest.TestTransformer.class.getName()))); + FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName); + policy.setTransformerRef("transformer"); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/FederationBrokerPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/FederationBrokerPluginTest.java new file mode 100644 index 0000000000..02a5cd49a2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/FederationBrokerPluginTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase; +import org.apache.activemq.artemis.tests.integration.federation.FederatedTestUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_FEDERATED_QUEUE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATION_STREAM_STARTED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATION_STREAM_STOPPED; + +public class FederationBrokerPluginTest extends FederatedTestBase { + + + private final Map methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier0 = new MethodCalledVerifier(methodCalls); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + getServer(0).registerBrokerPlugin(verifier0); + } + + @Test + public void testFederationStreamStartStop() throws Exception { + String address = getName(); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); + getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(0).getFederationManager().deploy(); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATION_STREAM_STARTED); + getServer(0).getFederationManager().stop(); + verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATION_STREAM_STOPPED); + + } + + @Test + public void testFederationStreamConsumerAddressUpstream() throws Exception { + String address = getName(); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); + getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(0).getFederationManager().deploy(); + + testFederationStreamConsumerAddress(address); + } + + @Test + public void testFederationStreamConsumerAddressDownstream() throws Exception { + String address = getName(); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration( + "server0", address, "server1"); + getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(1).getFederationManager().deploy(); + + testFederationStreamConsumerAddress(address); + } + + + private void testFederationStreamConsumerAddress(String address) throws Exception { + ConnectionFactory cf1 = getCF(1); + ConnectionFactory cf0 = getCF(0); + try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) { + connection1.start(); + connection0.start(); + + Session session0 = connection0.createSession(); + Session session1 = connection1.createSession(); + + Topic topic0 = session0.createTopic(address); + Topic topic1 = session1.createTopic(address); + + MessageConsumer consumer0 = session0.createConsumer(topic0); + MessageProducer producer1 = session1.createProducer(topic1); + + assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress( + SimpleString.toSimpleString(address)).getBindings().size() == 1, 5000, 500)); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER, + AFTER_CREATE_FEDERATED_QUEUE_CONSUMER, FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER); + verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER, + AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER); + + producer1.send(session1.createTextMessage("hello")); + assertNotNull(consumer0.receive(5000)); + + consumer0.close(); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER, + AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER, BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED, + AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED); + } + } + + @Test + public void testFederationStreamConsumerQueueUpstream() throws Exception { + String queueName = getName(); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); + getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(0).getFederationManager().deploy(); + + testFederationStreamConsumerQueue(queueName); + } + + @Test + public void testFederationStreamConsumerQueueDownstream() throws Exception { + String queueName = getName(); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration( + "server0", queueName, "server1"); + getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(1).getFederationManager().deploy(); + + testFederationStreamConsumerQueue(queueName); + } + + private void testFederationStreamConsumerQueue(String queueName) throws Exception { + ConnectionFactory cf1 = getCF(1); + ConnectionFactory cf0 = getCF(0); + try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) { + connection0.start(); + connection1.start(); + + Session session0 = connection0.createSession(); + Session session1 = connection1.createSession(); + + Queue queue0 = session0.createQueue(queueName); + Queue queue1 = session1.createQueue(queueName); + + MessageProducer producer1 = session1.createProducer(queue1); + producer1.send(session1.createTextMessage("hello")); + + MessageConsumer consumer0 = session0.createConsumer(queue0); + assertNotNull(consumer0.receive(1000)); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER, + AFTER_CREATE_FEDERATED_QUEUE_CONSUMER, FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER); + verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER, + AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER); + + consumer0.close(); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER, + AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER, BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED, + AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED); + + } + } + + @Test + public void testFederatedAddressConditional() throws Exception { + String address = getName(); + + getServer(0).registerBrokerPlugin(new ActiveMQServerFederationPlugin() { + @Override + public boolean federatedAddressConditionalCreateConsumer(org.apache.activemq.artemis.core.server.Queue queue) { + //always return false for test + return false; + } + }); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); + getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(0).getFederationManager().deploy(); + + ConnectionFactory cf1 = getCF(1); + ConnectionFactory cf0 = getCF(0); + try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) { + connection1.start(); + connection0.start(); + + Session session0 = connection0.createSession(); + Session session1 = connection1.createSession(); + + Topic topic0 = session0.createTopic(address); + Topic topic1 = session1.createTopic(address); + + MessageConsumer consumer0 = session0.createConsumer(topic0); + MessageProducer producer1 = session1.createProducer(topic1); + + assertFalse(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress( + SimpleString.toSimpleString(address)).getBindings().size() > 0, 2000, 500)); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER); + verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER, + AFTER_CREATE_FEDERATED_QUEUE_CONSUMER); + + producer1.send(session1.createTextMessage("hello")); + assertNull(consumer0.receive(1000)); + consumer0.close(); + + verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER, + AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER); + + } + } + + @Test + public void testFederatedQueueConditional() throws Exception { + String queueName = getName(); + + getServer(0).registerBrokerPlugin(new ActiveMQServerFederationPlugin() { + @Override + public boolean federatedQueueConditionalCreateConsumer(ServerConsumer consumer) { + //always return false for test + return false; + } + }); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); + getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(0).getFederationManager().deploy(); + + ConnectionFactory cf1 = getCF(1); + ConnectionFactory cf0 = getCF(0); + try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) { + connection0.start(); + connection1.start(); + + Session session0 = connection0.createSession(); + Session session1 = connection1.createSession(); + + Queue queue0 = session0.createQueue(queueName); + Queue queue1 = session1.createQueue(queueName); + + MessageProducer producer1 = session1.createProducer(queue1); + producer1.send(session1.createTextMessage("hello")); + + MessageConsumer consumer0 = session0.createConsumer(queue0); + assertNull(consumer0.receive(1000)); + + verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER); + verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER, + AFTER_CREATE_FEDERATED_QUEUE_CONSUMER); + consumer0.close(); + + verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER, + AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER); + + } + } + + protected ConnectionFactory getCF(int i) throws Exception { + return new ActiveMQConnectionFactory("vm://" + i); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index e14adf1cd2..2255060250 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -3,9 +3,6 @@ */ package org.apache.activemq.artemis.tests.integration.plugin; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; @@ -13,7 +10,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; - import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -32,6 +28,9 @@ import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; @@ -40,6 +39,9 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.tests.util.Wait; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this @@ -101,6 +103,16 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge"; public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge"; public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge"; + public static final String FEDERATION_STREAM_STARTED = "federationStreamStarted"; + public static final String FEDERATION_STREAM_STOPPED = "federationStreamStopped"; + public static final String BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER = "beforeCreateFederatedQueueConsumer"; + public static final String AFTER_CREATE_FEDERATED_QUEUE_CONSUMER = "afterCreateFederatedQueueConsumer"; + public static final String BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER = "beforeCloseFederatedQueueConsumer"; + public static final String AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER = "afterCloseFederatedQueueConsumer"; + public static final String FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER = "federatedAddressConditionalCreateConsumer"; + public static final String FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER = "federatedQueueConditionalCreateConsumer"; + public static final String BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED = "beforeFederatedQueueConsumerMessageHandled"; + public static final String AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED = "afterFederatedQueueConsumerMessageHandled"; public MethodCalledVerifier(Map methodCalls) { super(); @@ -381,16 +393,87 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { methodCalled(AFTER_ACKNOWLEDGE_BRIDGE); } + @Override + public void federationStreamStarted(FederationStream stream) throws ActiveMQException { + Preconditions.checkNotNull(stream); + methodCalled(FEDERATION_STREAM_STARTED); + } + + @Override + public void federationStreamStopped(FederationStream stream) throws ActiveMQException { + Preconditions.checkNotNull(stream); + methodCalled(FEDERATION_STREAM_STOPPED); + } + + @Override + public void beforeCreateFederatedQueueConsumer(FederatedConsumerKey key) throws ActiveMQException { + Preconditions.checkNotNull(key); + methodCalled(BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER); + } + + @Override + public void afterCreateFederatedQueueConsumer(FederatedQueueConsumer consumer) throws ActiveMQException { + Preconditions.checkNotNull(consumer); + methodCalled(AFTER_CREATE_FEDERATED_QUEUE_CONSUMER); + } + + @Override + public void beforeCloseFederatedQueueConsumer(FederatedQueueConsumer consumer) throws ActiveMQException { + Preconditions.checkNotNull(consumer); + methodCalled(BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER); + } + + @Override + public void afterCloseFederatedQueueConsumer(FederatedQueueConsumer consumer) throws ActiveMQException { + Preconditions.checkNotNull(consumer); + methodCalled(AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER); + } + + @Override + public void beforeFederatedQueueConsumerMessageHandled(FederatedQueueConsumer consumer, + Message message) throws ActiveMQException { + Preconditions.checkNotNull(consumer); + Preconditions.checkNotNull(message); + methodCalled(BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED); + } + + @Override + public void afterFederatedQueueConsumerMessageHandled(FederatedQueueConsumer consumer, + Message message) throws ActiveMQException { + Preconditions.checkNotNull(consumer); + Preconditions.checkNotNull(message); + methodCalled(AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED); + } + + @Override + public boolean federatedAddressConditionalCreateConsumer(Queue queue) throws ActiveMQException { + Preconditions.checkNotNull(queue); + methodCalled(FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER); + return true; + } + + @Override + public boolean federatedQueueConditionalCreateConsumer(ServerConsumer consumer) throws ActiveMQException { + Preconditions.checkNotNull(consumer); + methodCalled(FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER); + return true; + } + public void validatePluginMethodsEquals(int count, String... names) { + validatePluginMethodsEquals(count, Wait.MAX_WAIT_MILLIS, Wait.SLEEP_MILLIS); + } + + public void validatePluginMethodsEquals(int count, long timeout, long sleepMillis, String... names) { Arrays.asList(names).forEach(name -> { try { - Wait.waitFor(() -> count == methodCalls.getOrDefault(name, new AtomicInteger()).get()); + Wait.waitFor(() -> count == methodCalls.getOrDefault(name, new AtomicInteger()).get(), timeout, sleepMillis); } catch (Throwable ignored) { } assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get()); }); } + public void validatePluginMethodsAtLeast(int count, String... names) { Arrays.asList(names).forEach(name -> { try {