ARTEMIS-2565 This closes #2903

This commit is contained in:
Christopher L. Shannon (cshannon) 2020-01-06 06:53:36 -05:00
commit 3743bc9d9f
21 changed files with 1299 additions and 468 deletions

View File

@ -24,6 +24,7 @@ import java.util.Properties;
import java.util.Set;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
@ -1270,6 +1271,11 @@ public interface Configuration {
*/
List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins();
/**
* @return
*/
List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins();
/**
* @return
*/

View File

@ -41,20 +41,6 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -67,21 +53,36 @@ import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.jboss.logging.Logger;
@ -277,6 +278,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
private final List<ActiveMQServerMessagePlugin> brokerMessagePlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerBridgePlugin> brokerBridgePlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerCriticalPlugin> brokerCriticalPlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerFederationPlugin> brokerFederationPlugins = new CopyOnWriteArrayList<>();
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
@ -1494,6 +1496,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (plugin instanceof ActiveMQServerCriticalPlugin) {
brokerCriticalPlugins.add((ActiveMQServerCriticalPlugin) plugin);
}
if (plugin instanceof ActiveMQServerFederationPlugin) {
brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin);
}
}
@Override
@ -1526,6 +1531,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (plugin instanceof ActiveMQServerCriticalPlugin) {
brokerCriticalPlugins.remove(plugin);
}
if (plugin instanceof ActiveMQServerFederationPlugin) {
brokerFederationPlugins.remove(plugin);
}
}
@Override
@ -1578,6 +1586,11 @@ public class ConfigurationImpl implements Configuration, Serializable {
return brokerCriticalPlugins;
}
@Override
public List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins() {
return brokerFederationPlugins;
}
@Override
public List<FederationConfiguration> getFederationConfigurations() {
return federationConfigurations;

View File

@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@ -256,6 +257,8 @@ public interface ActiveMQServer extends ServiceComponent {
List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins();
List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins();
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException;
void callBrokerConnectionPlugins(ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException;
@ -276,6 +279,8 @@ public interface ActiveMQServer extends ServiceComponent {
void callBrokerCriticalPlugins(ActiveMQPluginRunnable<ActiveMQServerCriticalPlugin> pluginRun) throws ActiveMQException;
void callBrokerFederationPlugins(ActiveMQPluginRunnable<ActiveMQServerFederationPlugin> pluginRun) throws ActiveMQException;
boolean hasBrokerPlugins();
boolean hasBrokerConnectionPlugins();
@ -296,6 +301,8 @@ public interface ActiveMQServer extends ServiceComponent {
boolean hasBrokerCriticalPlugins();
boolean hasBrokerFederationPlugins();
void checkQueueCreationLimit(String username) throws Exception;
ServerSession createSession(String name,

View File

@ -1654,6 +1654,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void emptyAddressFile(String addressFile, String directory);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222286, value = "Error executing {0} federation plugin method.",
format = Message.Format.MESSAGE_FORMAT)
void federationPluginExecutionError(@Cause Throwable e, String pluginMethod);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.federation;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.federation.FederationStreamConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.federation.address.FederatedAddress;
import org.apache.activemq.artemis.core.server.federation.queue.FederatedQueue;
import org.jboss.logging.Logger;
public abstract class AbstractFederationStream implements FederationStream {
private static final Logger logger = Logger.getLogger(AbstractFederationStream.class);
protected final ActiveMQServer server;
protected final Federation federation;
protected final SimpleString name;
protected final FederationConnection connection;
private FederationStreamConfiguration config;
protected Map<String, FederatedQueue> federatedQueueMap = new HashMap<>();
protected Map<String, FederatedAddress> federatedAddressMap = new HashMap<>();
public AbstractFederationStream(ActiveMQServer server, Federation federation, String name, FederationStreamConfiguration config) {
this(server, federation, name, config, null);
}
public AbstractFederationStream(final ActiveMQServer server, final Federation federation, final String name, final FederationStreamConfiguration config,
final FederationConnection connection) {
this.server = server;
this.federation = federation;
Objects.requireNonNull(config.getName());
this.name = SimpleString.toSimpleString(config.getName());
this.config = config;
this.connection = connection != null ? connection : new FederationConnection(server.getConfiguration(), name, config.getConnectionConfiguration());
}
@Override
public synchronized void start() {
if (connection != null) {
connection.start();
}
}
@Override
public synchronized void stop() {
if (connection != null) {
connection.stop();
}
}
@Override
public Federation getFederation() {
return federation;
}
@Override
public FederationStreamConfiguration getConfig() {
return config;
}
@Override
public SimpleString getName() {
return name;
}
@Override
public FederationConnection getConnection() {
return connection;
}
@Override
public String getUser() {
String user = config.getConnectionConfiguration().getUsername();
if (user == null || user.isEmpty()) {
return federation.getFederationUser();
} else {
return user;
}
}
@Override
public String getPassword() {
String password = config.getConnectionConfiguration().getPassword();
if (password == null || password.isEmpty()) {
return federation.getFederationPassword();
} else {
return password;
}
}
@Override
public int getPriorityAdjustment() {
return config.getConnectionConfiguration().getPriorityAdjustment();
}
protected void callFederationStreamStartedPlugins() {
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.federationStreamStarted(this));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federationStreamStarted");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
}
protected void callFederationStreamStoppedPlugins() {
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.federationStreamStopped(this));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federationStreamStopped");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
}
}

View File

@ -19,15 +19,21 @@ package org.apache.activemq.artemis.core.server.federation;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer.ClientSessionCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl.ClientSessionCallback;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
public abstract class FederatedAbstract implements ActiveMQServerBasePlugin {
private static final Logger logger = Logger.getLogger(FederatedAbstract.class);
private static final WildcardConfiguration DEFAULT_WILDCARD_CONFIGURATION = new WildcardConfiguration();
protected final Federation federation;
protected ActiveMQServer server;
@ -104,9 +110,27 @@ public abstract class FederatedAbstract implements ActiveMQServerBasePlugin {
if (started) {
FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key);
if (remoteQueueConsumer == null) {
remoteQueueConsumer = new FederatedQueueConsumer(federation, server, transformer, key, upstream, callback);
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.beforeCreateFederatedQueueConsumer(key));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCreateFederatedQueueConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback);
remoteQueueConsumer.start();
remoteQueueConsumers.put(key, remoteQueueConsumer);
if (server.hasBrokerFederationPlugins()) {
try {
final FederatedQueueConsumer finalConsumer = remoteQueueConsumer;
server.callBrokerFederationPlugins(plugin -> plugin.afterCreateFederatedQueueConsumer(finalConsumer));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCreateFederatedQueueConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
}
remoteQueueConsumer.incrementCount();
}
@ -116,10 +140,26 @@ public abstract class FederatedAbstract implements ActiveMQServerBasePlugin {
public synchronized void removeRemoteConsumer(FederatedConsumerKey key) {
FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key);
if (remoteQueueConsumer != null) {
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.beforeCloseFederatedQueueConsumer(remoteQueueConsumer));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCloseFederatedQueueConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
if (remoteQueueConsumer.decrementCount() <= 0) {
remoteQueueConsumer.close();
remoteQueueConsumers.remove(key);
}
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.afterCloseFederatedQueueConsumer(remoteQueueConsumer));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCloseFederatedQueueConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
}
}

View File

@ -17,74 +17,15 @@
package org.apache.activemq.artemis.core.server.federation;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
public class FederatedQueueConsumer implements MessageHandler, SessionFailureListener {
public interface FederatedQueueConsumer extends MessageHandler {
public static final String FEDERATION_NAME = "federation-name";
public static final String FEDERATION_UPSTREAM_NAME = "federation-upstream-name";
private final ActiveMQServer server;
private final Federation federation;
private final FederatedConsumerKey key;
private final Transformer transformer;
private final FederationUpstream upstream;
private final AtomicInteger count = new AtomicInteger();
private final ScheduledExecutorService scheduledExecutorService;
private final int intialConnectDelayMultiplier = 2;
private final int intialConnectDelayMax = 30;
private final ClientSessionCallback clientSessionCallback;
String FEDERATION_NAME = "federation-name";
String FEDERATION_UPSTREAM_NAME = "federation-upstream-name";
private ClientSessionFactoryInternal clientSessionFactory;
private ClientSession clientSession;
private ClientConsumer clientConsumer;
public FederatedQueueConsumer(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {
this.federation = federation;
this.server = server;
this.key = key;
this.transformer = transformer;
this.upstream = upstream;
this.scheduledExecutorService = server.getScheduledPool();
this.clientSessionCallback = clientSessionCallback;
}
public int incrementCount() {
return count.incrementAndGet();
}
public int decrementCount() {
return count.decrementAndGet();
}
public void start() {
scheduleConnect(0);
}
private void scheduleConnect(int delay) {
scheduledExecutorService.schedule(() -> {
try {
connect();
} catch (Exception e) {
scheduleConnect(getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
}
}, delay, TimeUnit.SECONDS);
}
public static int getNextDelay(int delay, int delayMultiplier, int delayMax) {
static int getNextDelay(int delay, int delayMultiplier, int delayMax) {
int nextDelay;
if (delay == 0) {
nextDelay = 1;
@ -97,108 +38,19 @@ public class FederatedQueueConsumer implements MessageHandler, SessionFailureLis
return nextDelay;
}
private void connect() throws Exception {
try {
if (clientConsumer == null) {
synchronized (this) {
this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
this.clientSession.addFailureListener(this);
this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
this.clientSession.start();
if (clientSessionCallback != null) {
clientSessionCallback.callback(clientSession);
}
if (clientSession.queueQuery(key.getQueueName()).isExists()) {
this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
this.clientConsumer.setMessageHandler(this);
} else {
throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
}
}
}
} catch (Exception e) {
try {
if (clientSessionFactory != null) {
clientSessionFactory.cleanup();
}
disconnect();
} catch (ActiveMQException ignored) {
}
throw e;
}
}
FederationUpstream getFederationUpstream();
public void close() {
scheduleDisconnect(0);
}
Federation getFederation();
private void scheduleDisconnect(int delay) {
scheduledExecutorService.schedule(() -> {
try {
disconnect();
} catch (Exception ignored) {
}
}, delay, TimeUnit.SECONDS);
}
FederatedConsumerKey getKey();
private void disconnect() throws ActiveMQException {
if (clientConsumer != null) {
clientConsumer.close();
}
if (clientSession != null) {
clientSession.close();
}
clientConsumer = null;
clientSession = null;
ClientSession getClientSession();
if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
clientSessionFactory.numSessions() == 0)) {
clientSessionFactory.close();
clientSessionFactory = null;
}
}
int incrementCount();
@Override
public void onMessage(ClientMessage clientMessage) {
try {
Message message = transformer == null ? clientMessage : transformer.transform(clientMessage);
if (message != null) {
server.getPostOffice().route(message, true);
}
clientMessage.acknowledge();
} catch (Exception e) {
try {
clientSession.rollback();
} catch (ActiveMQException e1) {
}
}
}
int decrementCount();
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
connectionFailed(exception, failedOver, null);
}
void start();
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
try {
clientSessionFactory.cleanup();
clientSessionFactory.close();
clientConsumer = null;
clientSession = null;
clientSessionFactory = null;
} catch (Throwable dontCare) {
}
start();
}
@Override
public void beforeReconnect(ActiveMQException exception) {
}
public interface ClientSessionCallback {
void callback(ClientSession clientSession) throws ActiveMQException;
}
}
void close();
}

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.federation;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);
private final ActiveMQServer server;
private final Federation federation;
private final FederatedConsumerKey key;
private final Transformer transformer;
private final FederationUpstream upstream;
private final AtomicInteger count = new AtomicInteger();
private final ScheduledExecutorService scheduledExecutorService;
private final int intialConnectDelayMultiplier = 2;
private final int intialConnectDelayMax = 30;
private final ClientSessionCallback clientSessionCallback;
private ClientSessionFactoryInternal clientSessionFactory;
private ClientSession clientSession;
private ClientConsumer clientConsumer;
public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {
this.federation = federation;
this.server = server;
this.key = key;
this.transformer = transformer;
this.upstream = upstream;
this.scheduledExecutorService = server.getScheduledPool();
this.clientSessionCallback = clientSessionCallback;
}
@Override
public FederationUpstream getFederationUpstream() {
return upstream;
}
@Override
public Federation getFederation() {
return federation;
}
@Override
public FederatedConsumerKey getKey() {
return key;
}
@Override
public ClientSession getClientSession() {
return clientSession;
}
@Override
public int incrementCount() {
return count.incrementAndGet();
}
@Override
public int decrementCount() {
return count.decrementAndGet();
}
@Override
public void start() {
scheduleConnect(0);
}
private void scheduleConnect(int delay) {
scheduledExecutorService.schedule(() -> {
try {
connect();
} catch (Exception e) {
scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
}
}, delay, TimeUnit.SECONDS);
}
private void connect() throws Exception {
try {
if (clientConsumer == null) {
synchronized (this) {
this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
this.clientSession.addFailureListener(this);
this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
this.clientSession.start();
if (clientSessionCallback != null) {
clientSessionCallback.callback(clientSession);
}
if (clientSession.queueQuery(key.getQueueName()).isExists()) {
this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
this.clientConsumer.setMessageHandler(this);
} else {
throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
}
}
}
} catch (Exception e) {
try {
if (clientSessionFactory != null) {
clientSessionFactory.cleanup();
}
disconnect();
} catch (ActiveMQException ignored) {
}
throw e;
}
}
@Override
public void close() {
scheduleDisconnect(0);
}
private void scheduleDisconnect(int delay) {
scheduledExecutorService.schedule(() -> {
try {
disconnect();
} catch (Exception ignored) {
}
}, delay, TimeUnit.SECONDS);
}
private void disconnect() throws ActiveMQException {
if (clientConsumer != null) {
clientConsumer.close();
}
if (clientSession != null) {
clientSession.close();
}
clientConsumer = null;
clientSession = null;
if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
clientSessionFactory.numSessions() == 0)) {
clientSessionFactory.close();
clientSessionFactory = null;
}
}
@Override
public void onMessage(ClientMessage clientMessage) {
try {
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
Message message = transformer == null ? clientMessage : transformer.transform(clientMessage);
if (message != null) {
server.getPostOffice().route(message, true);
}
clientMessage.acknowledge();
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
}
} catch (Exception e) {
try {
clientSession.rollback();
} catch (ActiveMQException e1) {
}
}
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
connectionFailed(exception, failedOver, null);
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
try {
clientSessionFactory.cleanup();
clientSessionFactory.close();
clientConsumer = null;
clientSession = null;
clientSessionFactory = null;
} catch (Throwable dontCare) {
}
start();
}
@Override
public void beforeReconnect(ActiveMQException exception) {
}
public interface ClientSessionCallback {
void callback(ClientSession clientSession) throws ActiveMQException;
}
}

View File

@ -38,7 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.jboss.logging.Logger;
public class FederationDownstream extends FederationStream implements SessionFailureListener {
public class FederationDownstream extends AbstractFederationStream implements SessionFailureListener {
private static final Logger logger = Logger.getLogger(FederationDownstream.class);
@ -65,6 +65,7 @@ public class FederationDownstream extends FederationStream implements SessionFai
@Override
public synchronized void start() {
super.start();
callFederationStreamStartedPlugins();
try {
deploy(federationConfiguration);
} catch (ActiveMQException e) {
@ -75,6 +76,7 @@ public class FederationDownstream extends FederationStream implements SessionFai
@Override
public synchronized void stop() {
super.stop();
callFederationStreamStoppedPlugins();
}
public void deploy(FederationConfiguration federationConfiguration)

View File

@ -17,93 +17,26 @@
package org.apache.activemq.artemis.core.server.federation;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.federation.FederationStreamConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.federation.address.FederatedAddress;
import org.apache.activemq.artemis.core.server.federation.queue.FederatedQueue;
import org.jboss.logging.Logger;
public abstract class FederationStream {
public interface FederationStream {
void start();
private static final Logger logger = Logger.getLogger(FederationStream.class);
protected final ActiveMQServer server;
protected final Federation federation;
protected final SimpleString name;
protected final FederationConnection connection;
private FederationStreamConfiguration config;
protected Map<String, FederatedQueue> federatedQueueMap = new HashMap<>();
protected Map<String, FederatedAddress> federatedAddressMap = new HashMap<>();
void stop();
Federation getFederation();
public FederationStream(ActiveMQServer server, Federation federation, String name, FederationStreamConfiguration config) {
this(server, federation, name, config, null);
}
FederationStreamConfiguration getConfig();
public FederationStream(final ActiveMQServer server, final Federation federation, final String name, final FederationStreamConfiguration config,
final FederationConnection connection) {
this.server = server;
this.federation = federation;
Objects.requireNonNull(config.getName());
this.name = SimpleString.toSimpleString(config.getName());
this.config = config;
this.connection = connection != null ? connection : new FederationConnection(server.getConfiguration(), name, config.getConnectionConfiguration());
}
SimpleString getName();
public synchronized void start() {
if (connection != null) {
connection.start();
}
}
FederationConnection getConnection();
public synchronized void stop() {
if (connection != null) {
connection.stop();
}
}
String getUser();
public Federation getFederation() {
return federation;
}
public FederationStreamConfiguration getConfig() {
return config;
}
public SimpleString getName() {
return name;
}
public FederationConnection getConnection() {
return connection;
}
public String getUser() {
String user = config.getConnectionConfiguration().getUsername();
if (user == null || user.isEmpty()) {
return federation.getFederationUser();
} else {
return user;
}
}
public String getPassword() {
String password = config.getConnectionConfiguration().getPassword();
if (password == null || password.isEmpty()) {
return federation.getFederationPassword();
} else {
return password;
}
}
public int getPriorityAdjustment() {
return config.getConnectionConfiguration().getPriorityAdjustment();
}
String getPassword();
int getPriorityAdjustment();
}

View File

@ -31,7 +31,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.federation.address.FederatedAddress;
import org.apache.activemq.artemis.core.server.federation.queue.FederatedQueue;
public class FederationUpstream extends FederationStream {
public class FederationUpstream extends AbstractFederationStream {
private FederationUpstreamConfiguration config;
public FederationUpstream(ActiveMQServer server, Federation federation, String name, FederationUpstreamConfiguration config) {
@ -49,6 +49,8 @@ public class FederationUpstream extends FederationStream {
for (FederatedAddress federatedAddress : federatedAddressMap.values()) {
federatedAddress.start();
}
callFederationStreamStartedPlugins();
}
@Override
@ -64,6 +66,8 @@ public class FederationUpstream extends FederationStream {
federatedQueueMap.clear();
super.stop();
callFederationStreamStoppedPlugins();
}
public void deploy(Set<String> policyRefsToDeploy, Map<String, FederationPolicy> policyMap) throws ActiveMQException {

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.config.federation.FederationAddressPolic
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.federation.FederatedAbstract;
import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
@ -45,6 +47,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.Match;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.jboss.logging.Logger;
/**
* Federated Address, replicate messages from the remote brokers address to itself.
@ -58,7 +61,9 @@ import org.apache.activemq.artemis.utils.ByteUtil;
*/
public class FederatedAddress extends FederatedAbstract implements ActiveMQServerQueuePlugin, Serializable {
private static final Logger logger = Logger.getLogger(FederatedAddress.class);
public static final String FEDERATED_QUEUE_PREFIX = "federated";
public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
private final SimpleString queueNameFormat;
private final SimpleString filterString;
@ -105,7 +110,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
.stream()
.filter(b -> b instanceof QueueBinding)
.map(b -> ((QueueBinding) b).getQueue())
.forEach(this::createRemoteConsumer);
.forEach(this::conditionalCreateRemoteConsumer);
}
/**
@ -115,6 +120,24 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
*/
@Override
public synchronized void afterCreateQueue(Queue queue) {
conditionalCreateRemoteConsumer(queue);
}
private void conditionalCreateRemoteConsumer(Queue queue) {
if (server.hasBrokerFederationPlugins()) {
final AtomicBoolean conditionalCreate = new AtomicBoolean(true);
try {
server.callBrokerFederationPlugins(plugin -> {
conditionalCreate.set(conditionalCreate.get() && plugin.federatedAddressConditionalCreateConsumer(queue));
});
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedAddressConditionalCreateConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
if (!conditionalCreate.get()) {
return;
}
}
createRemoteConsumer(queue);
}

View File

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.federation.FederationUpstream;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.Match;
import org.jboss.logging.Logger;
/**
* Federated Queue, connect to upstream queues routing them to the local queue when a local consumer exist.
@ -51,6 +54,8 @@ import org.apache.activemq.artemis.core.settings.impl.Match;
*/
public class FederatedQueue extends FederatedAbstract implements ActiveMQServerConsumerPlugin, Serializable {
private static final Logger logger = Logger.getLogger(FederatedQueue.class);
private final Set<Matcher> includes;
private final Set<Matcher> excludes;
private final Filter metaDataFilter;
@ -93,7 +98,7 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC
.stream()
.filter(b -> b instanceof QueueBinding)
.map(b -> (QueueBinding) b)
.forEach(b -> createRemoteConsumer(b.getQueue()));
.forEach(b -> conditionalCreateRemoteConsumer(b.getQueue()));
}
/**
@ -103,18 +108,36 @@ public class FederatedQueue extends FederatedAbstract implements ActiveMQServerC
*/
@Override
public synchronized void afterCreateConsumer(ServerConsumer consumer) {
createRemoteConsumer(consumer);
conditionalCreateRemoteConsumer(consumer);
}
public FederationQueuePolicyConfiguration getConfig() {
return config;
}
private void createRemoteConsumer(Queue queue) {
private void conditionalCreateRemoteConsumer(ServerConsumer consumer) {
if (server.hasBrokerFederationPlugins()) {
final AtomicBoolean conditionalCreate = new AtomicBoolean(true);
try {
server.callBrokerFederationPlugins(plugin -> {
conditionalCreate.set(conditionalCreate.get() && plugin.federatedQueueConditionalCreateConsumer(consumer));
});
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedQueueConditionalCreateConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
if (!conditionalCreate.get()) {
return;
}
}
createRemoteConsumer(consumer);
}
private void conditionalCreateRemoteConsumer(Queue queue) {
queue.getConsumers()
.stream()
.filter(consumer -> consumer instanceof ServerConsumer)
.map(c -> (ServerConsumer) c).forEach(this::createRemoteConsumer);
.map(c -> (ServerConsumer) c).forEach(this::conditionalCreateRemoteConsumer);
}
private void createRemoteConsumer(ServerConsumer consumer) {

View File

@ -156,6 +156,7 @@ import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@ -2330,6 +2331,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return configuration.getBrokerCriticalPlugins();
}
@Override
public List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins() {
return configuration.getBrokerFederationPlugins();
}
@Override
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException {
callBrokerPlugins(getBrokerPlugins(), pluginRun);
@ -2380,6 +2386,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callBrokerPlugins(getBrokerCriticalPlugins(), pluginRun);
}
@Override
public void callBrokerFederationPlugins(final ActiveMQPluginRunnable<ActiveMQServerFederationPlugin> pluginRun) throws ActiveMQException {
callBrokerPlugins(getBrokerFederationPlugins(), pluginRun);
}
private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
if (pluginRun != null) {
for (P plugin : plugins) {
@ -2447,6 +2458,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return !getBrokerCriticalPlugins().isEmpty();
}
@Override
public boolean hasBrokerFederationPlugins() {
return !getBrokerFederationPlugins().isEmpty();
}
@Override
public ExecutorFactory getExecutorFactory() {
return executorFactory;

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer;
import org.apache.activemq.artemis.core.server.federation.FederationStream;
public interface ActiveMQServerFederationPlugin extends ActiveMQServerBasePlugin {
/**
* After a federation stream has been started
*
* @param stream
* @throws ActiveMQException
*/
default void federationStreamStarted(final FederationStream stream) throws ActiveMQException {
}
/**
* After a federation stream has been stopped
*
* @param stream
* @throws ActiveMQException
*/
default void federationStreamStopped(final FederationStream stream) throws ActiveMQException {
}
/**
* Before a federated queue consumer is created
*
* @param key
* @throws ActiveMQException
*/
default void beforeCreateFederatedQueueConsumer(final FederatedConsumerKey key) throws ActiveMQException {
}
/**
* After a federated queue consumer is created
*
* @param consumer
* @throws ActiveMQException
*/
default void afterCreateFederatedQueueConsumer(final FederatedQueueConsumer consumer) throws ActiveMQException {
}
/**
* Before a federated queue consumer is closed
*
* @param consumer
* @throws ActiveMQException
*/
default void beforeCloseFederatedQueueConsumer(final FederatedQueueConsumer consumer) throws ActiveMQException {
}
/**
* After a federated queue consumer is closed
*
* @param consumer
* @throws ActiveMQException
*/
default void afterCloseFederatedQueueConsumer(final FederatedQueueConsumer consumer) throws ActiveMQException {
}
/**
* Before a federated queue consumer handles a message
*
* @param consumer
* @param message
* @throws ActiveMQException
*/
default void beforeFederatedQueueConsumerMessageHandled(final FederatedQueueConsumer consumer, Message message) throws ActiveMQException {
}
/**
* After a federated queue consumer handles a message
*
* @param consumer
* @param message
* @throws ActiveMQException
*/
default void afterFederatedQueueConsumerMessageHandled(final FederatedQueueConsumer consumer, Message message) throws ActiveMQException {
}
/**
* Conditionally create a federated queue consumer for a federated address. This allows custom
* logic to be inserted to decide when to create federated queue consumers
*
* @param queue
* @return if true, create the consumer, else if false don't create
* @throws ActiveMQException
*/
default boolean federatedAddressConditionalCreateConsumer(final Queue queue) throws ActiveMQException {
return true;
}
/**
* Conditionally create a federated queue consumer for a federated queue. This allows custom
* logic to be inserted to decide when to create federated queue consumers
*
* @param consumer
* @return true, create the consumer, else if false don't create
* @throws ActiveMQException
*/
default boolean federatedQueueConditionalCreateConsumer(final ServerConsumer consumer) throws ActiveMQException {
return true;
}
}

View File

@ -30,5 +30,6 @@ public interface ActiveMQServerPlugin extends
ActiveMQServerBindingPlugin,
ActiveMQServerMessagePlugin,
ActiveMQServerBridgePlugin,
ActiveMQServerCriticalPlugin {
ActiveMQServerCriticalPlugin,
ActiveMQServerFederationPlugin {
}

View File

@ -24,16 +24,9 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.Collections;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
@ -60,12 +53,12 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testDownstreamFederatedAddressReplication() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server1", address,
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server1", address,
getServer(0).getConfiguration().getTransportConfigurations("server0")[0]);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address,
FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address,
getServer(1).getConfiguration().getTransportConfigurations("server1")[0]);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
getServer(1).getFederationManager().deploy();
@ -77,12 +70,12 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testDownstreamFederatedAddressReplicationRef() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server1", address,
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server1", address,
"server0");
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address,
FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address,
"server1");
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
getServer(1).getFederationManager().deploy();
@ -94,7 +87,7 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testDownstreamFederatedAddressReplicationRefOneWay() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address,
FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address,
"server1");
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
getServer(1).getFederationManager().deploy();
@ -106,11 +99,11 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testUpstreamFederatedAddressReplication() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address);
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
FederationConfiguration federationConfiguration2 = createUpstreamFederationConfiguration("server0", address);
FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server0", address);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
getServer(1).getFederationManager().deploy();
@ -121,8 +114,8 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testDownstreamFederatedAddressReplicationRefOneWayTransformer() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration2 = createDownstreamFederationConfiguration("server0", address, "server1");
addTransformerConfiguration(federationConfiguration2, address);
FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressDownstreamFederationConfiguration("server0", address, "server1");
FederatedTestUtil.addAddressTransformerConfiguration(federationConfiguration2, address);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
getServer(1).getFederationManager().deploy();
@ -158,7 +151,7 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testUpstreamFederatedAddressReplicationOneWay() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address);
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -169,8 +162,8 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testUpstreamFederatedAddressReplicationOneWayTransformer() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address);
addTransformerConfiguration(federationConfiguration, address);
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
FederatedTestUtil.addAddressTransformerConfiguration(federationConfiguration, address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -231,7 +224,6 @@ public class FederatedAddressTest extends FederatedTestBase {
}
@Test
public void testFederatedAddressDeployAfterQueuesExist() throws Exception {
String address = getName();
@ -257,7 +249,7 @@ public class FederatedAddressTest extends FederatedTestBase {
assertNull(consumer0.receive(100));
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address);
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -275,7 +267,7 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testFederatedAddressRemoteBrokerRestart() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address);
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -333,7 +325,7 @@ public class FederatedAddressTest extends FederatedTestBase {
public void testFederatedAddressLocalBrokerRestart() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", address);
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -398,12 +390,12 @@ public class FederatedAddressTest extends FederatedTestBase {
// }
//Connect broker 0 (consumer will be here at end of chain) to broker 1
FederationConfiguration federationConfiguration0 = createUpstreamFederationConfiguration("server1", address, 2);
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address, 2);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
//Connect broker 1 (middle of chain) to broker 2
FederationConfiguration federationConfiguration1 = createUpstreamFederationConfiguration("server2", address, 2);
FederationConfiguration federationConfiguration1 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server2", address, 2);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1);
getServer(1).getFederationManager().deploy();
//Broker 2 we dont setup any federation as he is the upstream (head of the chain)
@ -434,80 +426,6 @@ public class FederatedAddressTest extends FederatedTestBase {
}
}
private FederationConfiguration createFederationConfiguration(String address, int hops) {
FederationAddressPolicyConfiguration addressPolicyConfiguration = new FederationAddressPolicyConfiguration();
addressPolicyConfiguration.setName( "AddressPolicy" + address);
addressPolicyConfiguration.addInclude(new FederationAddressPolicyConfiguration.Matcher().setAddressMatch(address));
addressPolicyConfiguration.setMaxHops(hops);
FederationConfiguration federationConfiguration = new FederationConfiguration();
federationConfiguration.setName("default");
federationConfiguration.addFederationPolicy(addressPolicyConfiguration);
return federationConfiguration;
}
private FederationConfiguration createUpstreamFederationConfiguration(String connector, String address, int hops) {
FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration();
upstreamConfiguration.setName(connector);
upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
upstreamConfiguration.addPolicyRef("AddressPolicy" + address);
FederationConfiguration federationConfiguration = createFederationConfiguration(address, hops);
federationConfiguration.addUpstreamConfiguration(upstreamConfiguration);
return federationConfiguration;
}
private FederationConfiguration createUpstreamFederationConfiguration(String connector, String address) {
return createUpstreamFederationConfiguration(connector, address, 1);
}
private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration) {
return createDownstreamFederationConfiguration(connector, address, transportConfiguration, 1);
}
private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration,
int hops) {
FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration();
downstreamConfiguration.setName(connector);
downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
downstreamConfiguration.addPolicyRef("AddressPolicy" + address);
downstreamConfiguration.setUpstreamConfiguration(transportConfiguration);
FederationConfiguration federationConfiguration = createFederationConfiguration(address, hops);
federationConfiguration.addDownstreamConfiguration(downstreamConfiguration);
return federationConfiguration;
}
private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef,
int hops) {
FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration();
downstreamConfiguration.setName(connector);
downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
downstreamConfiguration.addPolicyRef("AddressPolicy" + address);
downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef);
FederationConfiguration federationConfiguration = createFederationConfiguration(address, hops);
federationConfiguration.addDownstreamConfiguration(downstreamConfiguration);
return federationConfiguration;
}
private FederationConfiguration createDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef) {
return createDownstreamFederationConfiguration(connector, address, transportConfigurationRef, 1);
}
private void addTransformerConfiguration(final FederationConfiguration federationConfiguration, final String address) {
federationConfiguration.addTransformerConfiguration(
new FederationTransformerConfiguration("transformer", new TransformerConfiguration(TestTransformer.class.getName())));
FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address);
policy.setTransformerRef("transformer");
}
private Message createTextMessage(Session session1, String group) throws JMSException {
Message message = session1.createTextMessage("hello");
message.setStringProperty("JMSXGroupID", group);

View File

@ -25,15 +25,11 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Collections;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
@ -63,7 +59,7 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteConsumeUpstream() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -74,7 +70,7 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName);
//Favor federated broker over local consumers
policy.setPriorityAdjustment(1);
@ -89,7 +85,7 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteConsumeDownstreamPriorityAdjustment() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server0", queueName, "server1");
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1");
FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName);
//Favor federated broker over local consumers
policy.setPriorityAdjustment(1);
@ -157,8 +153,8 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteConsumeUpstreamTransformer() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
addTransformerConfiguration(federationConfiguration, queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
FederatedTestUtil.addQueueTransformerConfiguration(federationConfiguration, queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -169,7 +165,7 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteConsumeDownstream() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server0", queueName, "server1");
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1");
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(1).getFederationManager().deploy();
@ -180,8 +176,8 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteConsumeDownstreamTransformer() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = createDownstreamFederationConfiguration("server0", queueName, "server1");
addTransformerConfiguration(federationConfiguration, queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1");
FederatedTestUtil.addQueueTransformerConfiguration(federationConfiguration, queueName);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(1).getFederationManager().deploy();
@ -256,7 +252,7 @@ public class FederatedQueueTest extends FederatedTestBase {
assertNull(consumer0.receiveNoWait());
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -273,11 +269,11 @@ public class FederatedQueueTest extends FederatedTestBase {
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = createUpstreamFederationConfiguration("server1", queueName);
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
FederationConfiguration federationConfiguration1 = createUpstreamFederationConfiguration("server0", queueName);
FederationConfiguration federationConfiguration1 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server0", queueName);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1);
getServer(1).getFederationManager().deploy();
@ -291,11 +287,11 @@ public class FederatedQueueTest extends FederatedTestBase {
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1", queueName, "server0");
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1", queueName, "server0");
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
FederationConfiguration federationConfiguration1 = createDownstreamFederationConfiguration("server0", queueName, "server1");
FederationConfiguration federationConfiguration1 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", queueName, "server1");
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1);
getServer(1).getFederationManager().deploy();
@ -310,9 +306,9 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream",
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, false, "server0");
FederationUpstreamConfiguration upstreamConfig = createFederationUpstream("server1", queueName);
FederationUpstreamConfiguration upstreamConfig = FederatedTestUtil.createQueueFederationUpstream("server1", queueName);
federationConfiguration0.addUpstreamConfiguration(upstreamConfig);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
@ -328,9 +324,9 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream",
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, true, "server0");
FederationUpstreamConfiguration upstreamConfig = createFederationUpstream("server1", queueName);
FederationUpstreamConfiguration upstreamConfig = FederatedTestUtil.createQueueFederationUpstream("server1", queueName);
upstreamConfig.getConnectionConfiguration().setShareConnection(true);
federationConfiguration0.addUpstreamConfiguration(upstreamConfig);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
@ -347,9 +343,9 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream",
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, false, "server0");
federationConfiguration0.addUpstreamConfiguration(createFederationUpstream("server1", queueName));
federationConfiguration0.addUpstreamConfiguration(FederatedTestUtil.createQueueFederationUpstream("server1", queueName));
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
@ -364,9 +360,9 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = createDownstreamFederationConfiguration("server1-downstream",
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, true, "server0");
FederationUpstreamConfiguration upstreamConfiguration = createFederationUpstream("server1", queueName);
FederationUpstreamConfiguration upstreamConfiguration = FederatedTestUtil.createQueueFederationUpstream("server1", queueName);
upstreamConfiguration.getConnectionConfiguration().setShareConnection(true);
federationConfiguration0.addUpstreamConfiguration(upstreamConfiguration);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
@ -469,12 +465,12 @@ public class FederatedQueueTest extends FederatedTestBase {
}
//Connect broker 0 (consumer will be here at end of chain) to broker 1
FederationConfiguration federationConfiguration0 = createUpstreamFederationConfiguration("server1", queueName, true);
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName, true);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
//Connect broker 1 (middle of chain) to broker 2
FederationConfiguration federationConfiguration1 = createUpstreamFederationConfiguration("server2", queueName, true);
FederationConfiguration federationConfiguration1 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server2", queueName, true);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1);
getServer(1).getFederationManager().deploy();
//Broker 2 we dont setup any federation as he is the upstream (head of the chain)
@ -511,7 +507,7 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -566,7 +562,7 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -615,79 +611,6 @@ public class FederatedQueueTest extends FederatedTestBase {
assertNotNull(consumer0.receive(1000));
}
private FederationConfiguration createDownstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated,
String transportConfigurationRef) {
return createDownstreamFederationConfiguration(null, connector, queueName, includeFederated, false, transportConfigurationRef);
}
private FederationConfiguration createDownstreamFederationConfiguration(String name, String connector, String queueName, Boolean includeFederated,
boolean shareConnection, String transportConfigurationRef) {
FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration();
downstreamConfiguration.setName(name != null ? name : connector);
downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
downstreamConfiguration.getConnectionConfiguration().setShareConnection(shareConnection);
downstreamConfiguration.addPolicyRef("QueuePolicy" + queueName);
downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef);
FederationConfiguration federationConfiguration = createFederationConfiguration(connector, queueName, includeFederated);
federationConfiguration.addDownstreamConfiguration(downstreamConfiguration);
return federationConfiguration;
}
private FederationConfiguration createDownstreamFederationConfiguration(String connector, String queueName, String transportConfigurationRef) {
return createDownstreamFederationConfiguration(null, connector, queueName, null, false, transportConfigurationRef);
}
private FederationConfiguration createUpstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated) {
FederationUpstreamConfiguration upstreamConfiguration = createFederationUpstream(connector, queueName);
FederationConfiguration federationConfiguration = createFederationConfiguration(connector, queueName, includeFederated);
federationConfiguration.addUpstreamConfiguration(upstreamConfiguration);
return federationConfiguration;
}
private FederationUpstreamConfiguration createFederationUpstream(String connector, String queueName) {
FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration();
upstreamConfiguration.setName("server1-upstream");
upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
upstreamConfiguration.addPolicyRef("QueuePolicy" + queueName);
return upstreamConfiguration;
}
private FederationConfiguration createUpstreamFederationConfiguration(String connector, String queueName) {
return createUpstreamFederationConfiguration(connector, queueName, null);
}
private FederationConfiguration createFederationConfiguration(String connector, String queueName, Boolean includeFederated) {
FederationQueuePolicyConfiguration queuePolicyConfiguration = new FederationQueuePolicyConfiguration();
queuePolicyConfiguration.setName( "QueuePolicy" + queueName);
queuePolicyConfiguration.addInclude(new FederationQueuePolicyConfiguration.Matcher()
.setQueueMatch(queueName).setAddressMatch("#"));
if (includeFederated != null) {
queuePolicyConfiguration.setIncludeFederated(includeFederated);
}
FederationConfiguration federationConfiguration = new FederationConfiguration();
federationConfiguration.setName("default");
federationConfiguration.addFederationPolicy(queuePolicyConfiguration);
return federationConfiguration;
}
private void addTransformerConfiguration(final FederationConfiguration federationConfiguration, final String queueName) {
federationConfiguration.addTransformerConfiguration(
new FederationTransformerConfiguration("transformer", new TransformerConfiguration(TestTransformer.class.getName())));
FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName);
policy.setTransformerRef("transformer");
}
private Message createTextMessage(Session session1, String group) throws JMSException {
Message message = session1.createTextMessage("hello");
message.setStringProperty("JMSXGroupID", group);

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.federation;
import java.util.Collections;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationTransformerConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
public class FederatedTestUtil {
public static FederationConfiguration createAddressFederationConfiguration(String address, int hops) {
FederationAddressPolicyConfiguration addressPolicyConfiguration = new FederationAddressPolicyConfiguration();
addressPolicyConfiguration.setName( "AddressPolicy" + address);
addressPolicyConfiguration.addInclude(new FederationAddressPolicyConfiguration.Matcher().setAddressMatch(address));
addressPolicyConfiguration.setMaxHops(hops);
FederationConfiguration federationConfiguration = new FederationConfiguration();
federationConfiguration.setName("default");
federationConfiguration.addFederationPolicy(addressPolicyConfiguration);
return federationConfiguration;
}
public static FederationConfiguration createAddressUpstreamFederationConfiguration(String connector, String address, int hops) {
FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration();
upstreamConfiguration.setName(connector);
upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
upstreamConfiguration.addPolicyRef("AddressPolicy" + address);
FederationConfiguration federationConfiguration = createAddressFederationConfiguration(address, hops);
federationConfiguration.addUpstreamConfiguration(upstreamConfiguration);
return federationConfiguration;
}
public static FederationConfiguration createAddressUpstreamFederationConfiguration(String connector, String address) {
return createAddressUpstreamFederationConfiguration(connector, address, 1);
}
public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration) {
return createAddressDownstreamFederationConfiguration(connector, address, transportConfiguration, 1);
}
public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, TransportConfiguration transportConfiguration,
int hops) {
FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration();
downstreamConfiguration.setName(connector);
downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
downstreamConfiguration.addPolicyRef("AddressPolicy" + address);
downstreamConfiguration.setUpstreamConfiguration(transportConfiguration);
FederationConfiguration federationConfiguration = createAddressFederationConfiguration(address, hops);
federationConfiguration.addDownstreamConfiguration(downstreamConfiguration);
return federationConfiguration;
}
public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef,
int hops) {
FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration();
downstreamConfiguration.setName(connector);
downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
downstreamConfiguration.addPolicyRef("AddressPolicy" + address);
downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef);
FederationConfiguration federationConfiguration = createAddressFederationConfiguration(address, hops);
federationConfiguration.addDownstreamConfiguration(downstreamConfiguration);
return federationConfiguration;
}
public static FederationConfiguration createAddressDownstreamFederationConfiguration(String connector, String address, String transportConfigurationRef) {
return createAddressDownstreamFederationConfiguration(connector, address, transportConfigurationRef, 1);
}
public static void addAddressTransformerConfiguration(final FederationConfiguration federationConfiguration, final String address) {
federationConfiguration.addTransformerConfiguration(
new FederationTransformerConfiguration("transformer", new TransformerConfiguration(FederatedAddressTest.TestTransformer.class.getName())));
FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address);
policy.setTransformerRef("transformer");
}
public static FederationConfiguration createDownstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated,
String transportConfigurationRef) {
return createQueueDownstreamFederationConfiguration(null, connector, queueName, includeFederated, false, transportConfigurationRef);
}
public static FederationConfiguration createQueueDownstreamFederationConfiguration(String name, String connector, String queueName, Boolean includeFederated,
boolean shareConnection, String transportConfigurationRef) {
FederationDownstreamConfiguration downstreamConfiguration = new FederationDownstreamConfiguration();
downstreamConfiguration.setName(name != null ? name : connector);
downstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
downstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
downstreamConfiguration.getConnectionConfiguration().setShareConnection(shareConnection);
downstreamConfiguration.addPolicyRef("QueuePolicy" + queueName);
downstreamConfiguration.setUpstreamConfigurationRef(transportConfigurationRef);
FederationConfiguration federationConfiguration = createQueueFederationConfiguration(connector, queueName, includeFederated);
federationConfiguration.addDownstreamConfiguration(downstreamConfiguration);
return federationConfiguration;
}
public static FederationConfiguration createQueueDownstreamFederationConfiguration(String connector, String queueName, String transportConfigurationRef) {
return createQueueDownstreamFederationConfiguration(null, connector, queueName, null, false, transportConfigurationRef);
}
public static FederationConfiguration createQueueUpstreamFederationConfiguration(String connector, String queueName, Boolean includeFederated) {
FederationUpstreamConfiguration upstreamConfiguration = createQueueFederationUpstream(connector, queueName);
FederationConfiguration federationConfiguration = createQueueFederationConfiguration(connector, queueName, includeFederated);
federationConfiguration.addUpstreamConfiguration(upstreamConfiguration);
return federationConfiguration;
}
public static FederationUpstreamConfiguration createQueueFederationUpstream(String connector, String queueName) {
FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration();
upstreamConfiguration.setName("server1-upstream");
upstreamConfiguration.getConnectionConfiguration().setStaticConnectors(Collections.singletonList(connector));
upstreamConfiguration.getConnectionConfiguration().setCircuitBreakerTimeout(-1);
upstreamConfiguration.addPolicyRef("QueuePolicy" + queueName);
return upstreamConfiguration;
}
public static FederationConfiguration createQueueUpstreamFederationConfiguration(String connector, String queueName) {
return createQueueUpstreamFederationConfiguration(connector, queueName, null);
}
public static FederationConfiguration createQueueFederationConfiguration(String connector, String queueName, Boolean includeFederated) {
FederationQueuePolicyConfiguration queuePolicyConfiguration = new FederationQueuePolicyConfiguration();
queuePolicyConfiguration.setName( "QueuePolicy" + queueName);
queuePolicyConfiguration.addInclude(new FederationQueuePolicyConfiguration.Matcher()
.setQueueMatch(queueName).setAddressMatch("#"));
if (includeFederated != null) {
queuePolicyConfiguration.setIncludeFederated(includeFederated);
}
FederationConfiguration federationConfiguration = new FederationConfiguration();
federationConfiguration.setName("default");
federationConfiguration.addFederationPolicy(queuePolicyConfiguration);
return federationConfiguration;
}
public static void addQueueTransformerConfiguration(final FederationConfiguration federationConfiguration, final String queueName) {
federationConfiguration.addTransformerConfiguration(
new FederationTransformerConfiguration("transformer", new TransformerConfiguration(FederatedQueueTest.TestTransformer.class.getName())));
FederationQueuePolicyConfiguration policy = (FederationQueuePolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("QueuePolicy" + queueName);
policy.setTransformerRef("transformer");
}
}

View File

@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase;
import org.apache.activemq.artemis.tests.integration.federation.FederatedTestUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_FEDERATED_QUEUE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATION_STREAM_STARTED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.FEDERATION_STREAM_STOPPED;
public class FederationBrokerPluginTest extends FederatedTestBase {
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier0 = new MethodCalledVerifier(methodCalls);
@Override
@Before
public void setUp() throws Exception {
super.setUp();
getServer(0).registerBrokerPlugin(verifier0);
}
@Test
public void testFederationStreamStartStop() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATION_STREAM_STARTED);
getServer(0).getFederationManager().stop();
verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATION_STREAM_STOPPED);
}
@Test
public void testFederationStreamConsumerAddressUpstream() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
testFederationStreamConsumerAddress(address);
}
@Test
public void testFederationStreamConsumerAddressDownstream() throws Exception {
String address = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration(
"server0", address, "server1");
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(1).getFederationManager().deploy();
testFederationStreamConsumerAddress(address);
}
private void testFederationStreamConsumerAddress(String address) throws Exception {
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection1.start();
connection0.start();
Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Topic topic0 = session0.createTopic(address);
Topic topic1 = session1.createTopic(address);
MessageConsumer consumer0 = session0.createConsumer(topic0);
MessageProducer producer1 = session1.createProducer(topic1);
assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
SimpleString.toSimpleString(address)).getBindings().size() == 1, 5000, 500));
verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER,
AFTER_CREATE_FEDERATED_QUEUE_CONSUMER, FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER);
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
producer1.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(5000));
consumer0.close();
verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER, BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED,
AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED);
}
}
@Test
public void testFederationStreamConsumerQueueUpstream() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
testFederationStreamConsumerQueue(queueName);
}
@Test
public void testFederationStreamConsumerQueueDownstream() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration(
"server0", queueName, "server1");
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(1).getFederationManager().deploy();
testFederationStreamConsumerQueue(queueName);
}
private void testFederationStreamConsumerQueue(String queueName) throws Exception {
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection0.start();
connection1.start();
Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Queue queue0 = session0.createQueue(queueName);
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer1 = session1.createProducer(queue1);
producer1.send(session1.createTextMessage("hello"));
MessageConsumer consumer0 = session0.createConsumer(queue0);
assertNotNull(consumer0.receive(1000));
verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER,
AFTER_CREATE_FEDERATED_QUEUE_CONSUMER, FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER);
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
consumer0.close();
verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER, BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED,
AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED);
}
}
@Test
public void testFederatedAddressConditional() throws Exception {
String address = getName();
getServer(0).registerBrokerPlugin(new ActiveMQServerFederationPlugin() {
@Override
public boolean federatedAddressConditionalCreateConsumer(org.apache.activemq.artemis.core.server.Queue queue) {
//always return false for test
return false;
}
});
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection1.start();
connection0.start();
Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Topic topic0 = session0.createTopic(address);
Topic topic1 = session1.createTopic(address);
MessageConsumer consumer0 = session0.createConsumer(topic0);
MessageProducer producer1 = session1.createProducer(topic1);
assertFalse(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
SimpleString.toSimpleString(address)).getBindings().size() > 0, 2000, 500));
verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER);
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER,
AFTER_CREATE_FEDERATED_QUEUE_CONSUMER);
producer1.send(session1.createTextMessage("hello"));
assertNull(consumer0.receive(1000));
consumer0.close();
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
}
}
@Test
public void testFederatedQueueConditional() throws Exception {
String queueName = getName();
getServer(0).registerBrokerPlugin(new ActiveMQServerFederationPlugin() {
@Override
public boolean federatedQueueConditionalCreateConsumer(ServerConsumer consumer) {
//always return false for test
return false;
}
});
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection0.start();
connection1.start();
Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Queue queue0 = session0.createQueue(queueName);
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer1 = session1.createProducer(queue1);
producer1.send(session1.createTextMessage("hello"));
MessageConsumer consumer0 = session0.createConsumer(queue0);
assertNull(consumer0.receive(1000));
verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER);
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER,
AFTER_CREATE_FEDERATED_QUEUE_CONSUMER);
consumer0.close();
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
}
}
protected ConnectionFactory getCF(int i) throws Exception {
return new ActiveMQConnectionFactory("vm://" + i);
}
}

View File

@ -3,9 +3,6 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
@ -13,7 +10,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -32,6 +28,9 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer;
import org.apache.activemq.artemis.core.server.federation.FederationStream;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
@ -40,6 +39,9 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.tests.util.Wait;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
@ -101,6 +103,16 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge";
public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge";
public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge";
public static final String FEDERATION_STREAM_STARTED = "federationStreamStarted";
public static final String FEDERATION_STREAM_STOPPED = "federationStreamStopped";
public static final String BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER = "beforeCreateFederatedQueueConsumer";
public static final String AFTER_CREATE_FEDERATED_QUEUE_CONSUMER = "afterCreateFederatedQueueConsumer";
public static final String BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER = "beforeCloseFederatedQueueConsumer";
public static final String AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER = "afterCloseFederatedQueueConsumer";
public static final String FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER = "federatedAddressConditionalCreateConsumer";
public static final String FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER = "federatedQueueConditionalCreateConsumer";
public static final String BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED = "beforeFederatedQueueConsumerMessageHandled";
public static final String AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED = "afterFederatedQueueConsumerMessageHandled";
public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
super();
@ -381,16 +393,87 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_ACKNOWLEDGE_BRIDGE);
}
@Override
public void federationStreamStarted(FederationStream stream) throws ActiveMQException {
Preconditions.checkNotNull(stream);
methodCalled(FEDERATION_STREAM_STARTED);
}
@Override
public void federationStreamStopped(FederationStream stream) throws ActiveMQException {
Preconditions.checkNotNull(stream);
methodCalled(FEDERATION_STREAM_STOPPED);
}
@Override
public void beforeCreateFederatedQueueConsumer(FederatedConsumerKey key) throws ActiveMQException {
Preconditions.checkNotNull(key);
methodCalled(BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER);
}
@Override
public void afterCreateFederatedQueueConsumer(FederatedQueueConsumer consumer) throws ActiveMQException {
Preconditions.checkNotNull(consumer);
methodCalled(AFTER_CREATE_FEDERATED_QUEUE_CONSUMER);
}
@Override
public void beforeCloseFederatedQueueConsumer(FederatedQueueConsumer consumer) throws ActiveMQException {
Preconditions.checkNotNull(consumer);
methodCalled(BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER);
}
@Override
public void afterCloseFederatedQueueConsumer(FederatedQueueConsumer consumer) throws ActiveMQException {
Preconditions.checkNotNull(consumer);
methodCalled(AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
}
@Override
public void beforeFederatedQueueConsumerMessageHandled(FederatedQueueConsumer consumer,
Message message) throws ActiveMQException {
Preconditions.checkNotNull(consumer);
Preconditions.checkNotNull(message);
methodCalled(BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED);
}
@Override
public void afterFederatedQueueConsumerMessageHandled(FederatedQueueConsumer consumer,
Message message) throws ActiveMQException {
Preconditions.checkNotNull(consumer);
Preconditions.checkNotNull(message);
methodCalled(AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED);
}
@Override
public boolean federatedAddressConditionalCreateConsumer(Queue queue) throws ActiveMQException {
Preconditions.checkNotNull(queue);
methodCalled(FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER);
return true;
}
@Override
public boolean federatedQueueConditionalCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
Preconditions.checkNotNull(consumer);
methodCalled(FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER);
return true;
}
public void validatePluginMethodsEquals(int count, String... names) {
validatePluginMethodsEquals(count, Wait.MAX_WAIT_MILLIS, Wait.SLEEP_MILLIS);
}
public void validatePluginMethodsEquals(int count, long timeout, long sleepMillis, String... names) {
Arrays.asList(names).forEach(name -> {
try {
Wait.waitFor(() -> count == methodCalls.getOrDefault(name, new AtomicInteger()).get());
Wait.waitFor(() -> count == methodCalls.getOrDefault(name, new AtomicInteger()).get(), timeout, sleepMillis);
} catch (Throwable ignored) {
}
assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get());
});
}
public void validatePluginMethodsAtLeast(int count, String... names) {
Arrays.asList(names).forEach(name -> {
try {