diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java new file mode 100644 index 0000000000..3365723861 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport { + + protected ActiveMQServer[] servers = new ActiveMQServer[3]; + private ActiveMQServer server0; + private ActiveMQServer server1; + private ActiveMQServer server2; + private SimpleString customNotificationQueue; + private SimpleString frameworkNotificationsQueue; + private SimpleString bridgeNotificationsQueue; + private SimpleString notificationsQueue; + + private String getServer0URL() { + return "tcp://localhost:61616"; + } + + private String getServer1URL() { + return "tcp://localhost:61617"; + } + + private String getServer2URL() { + return "tcp://localhost:61618"; + } + + @Override + public URI getBrokerAmqpConnectionURI() { + try { + return new URI(getServer0URL()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Override + protected ActiveMQServer createServer(final boolean realFiles, + final Configuration configuration, + final long pageSize, + final long maxAddressSize, + final Map settings) { + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles)); + + if (settings != null) { + for (Map.Entry setting : settings.entrySet()) { + server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue()); + } + } + + AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedeliveryDelay(0).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + return server; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server0 = createServer(false, createBasicConfig()); + server1 = createServer(false, createBasicConfig()); + server2 = createServer(false, createBasicConfig()); + + servers[0] = server0; + servers[1] = server1; + servers[2] = server2; + + server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL()); + server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL()); + + server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL()); + server2.getConfiguration().addAcceptorConfiguration("acceptor", getServer2URL()); + + DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true); + + DivertConfiguration frameworkNotificationsDivertServer1 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true); + DivertConfiguration frameworkNotificationsDivertServer2 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true); + + server0.getConfiguration().addDivertConfiguration(customNotificationsDivert); + + server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer1); + server2.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer2); + + customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification"); + frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications"); + bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications"); + notificationsQueue = SimpleString.toSimpleString("Notifications"); + + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2); + setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1); + + server0.start(); + + server1.start(); + server2.start(); + + server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false); + server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false); + + server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false); + server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false); + + server2.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false); + server2.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false); + + server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker"))); + } + + @After + @Override + public void tearDown() throws Exception { + try { + if (server0 != null) { + server0.stop(); + } + if (server1 != null) { + server1.stop(); + } + if (server2 != null) { + server2.stop(); + } + } finally { + super.tearDown(); + } + } + + @Test + public void testSendMessageToBroker0GetFromBroker1() throws Exception { + try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) { + + session.start(); + + sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true); + + ClientMessage message = consumer.receive(5000); + assertNotNull(message); + + message = consumer.receiveImmediate(); + assertNull(message); + } + } + + @Test + public void testSendMessageToBroker0GetFromBroker2() throws Exception { + try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer2URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) { + + session.start(); + + sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true); + + ClientMessage message = consumer.receive(5000); + assertNotNull(message); + + message = consumer.receiveImmediate(); + assertNull(message); + } + } + + protected void setupClusterConnection(final String name, + final String address, + final MessageLoadBalancingType messageLoadBalancingType, + final int maxHops, + final boolean netty, + final int nodeFrom, + final int... nodesTo) { + setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo); + } + + protected void setupClusterConnection(final String name, + final String address, + final MessageLoadBalancingType messageLoadBalancingType, + final int maxHops, + final boolean netty, + final ClusterTestBase.ClusterConfigCallback cb, + final int nodeFrom, + final int... nodesTo) { + ActiveMQServer serverFrom = servers[nodeFrom]; + + if (serverFrom == null) { + throw new IllegalStateException("No server at node " + nodeFrom); + } + + TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); + + List pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); + Configuration config = serverFrom.getConfiguration(); + ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs); + + if (cb != null) { + cb.configure(clusterConf); + } + config.getClusterConfigurations().add(clusterConf); + } + + private List getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) { + List pairs = new ArrayList<>(); + for (int element : nodesTo) { + TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); + pairs.add(serverTotc.getName()); + } + return pairs; + } + + private ClusterConnectionConfiguration createClusterConfig(final String name, + final String address, + final MessageLoadBalancingType messageLoadBalancingType, + final int maxHops, + TransportConfiguration connectorFrom, + List pairs) { + return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + } +} \ No newline at end of file