diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index bcfb58dd62..454ba6f0ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -788,8 +788,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier); targetLocator.setMinLargeMessageSize(minLargeMessageSize); - // No producer flow control on the bridges, as we don't want to lock the queues - targetLocator.setProducerWindowSize(-1); + // No producer flow control on the bridges by default, as we don't want to lock the queues + targetLocator.setProducerWindowSize(this.producerWindowSize); targetLocator.setAfterConnectionInternalListener(this); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java index e6b97ebf23..b202924d6e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java @@ -61,4 +61,11 @@ public class ClusterConnectionConfigurationTest { Assert.assertEquals("myGroup", configuration.getDiscoveryGroupName()); Assert.assertEquals(132, configuration.getMinLargeMessageSize()); } + + @Test + public void testClusterConnectionProducerWindowSize() throws Exception { + ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); + ClusterConnectionConfiguration configuration = parser.newObject(new URI("static:(tcp://localhost:6556)?producerWindowSize=1234"), null); + Assert.assertEquals(1234, configuration.getProducerWindowSize()); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterConnectionConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterConnectionConfigTest.java new file mode 100644 index 0000000000..db775114f3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterConnectionConfigTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.distribution; + +import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Random; + +public class ClusterConnectionConfigTest extends ClusterTestBase { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + start(); + } + + private void start() throws Exception { + setupServers(); + } + + protected boolean isNetty() { + return true; + } + + protected void setupServers() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + } + + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType, final ClusterConfigCallback cb) throws Exception { + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), cb, 0, 1, 2); + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), cb, 1, 0, 2); + } + + @Test + public void testRedistributionFlowControl() throws Exception { + final int producerWindow = new Random().nextInt(Integer.MAX_VALUE); + System.out.println("window is: " + producerWindow); + setupCluster(MessageLoadBalancingType.ON_DEMAND, (ClusterConnectionConfiguration cfg) -> { + cfg.setProducerWindowSize(producerWindow); + }); + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + + addConsumer(1, 1, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 0, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 1, 0, false); + + send(0, "queues.testaddress", 1, false, null); + + //receiveMessages(consumers[1].consumer, 0, 1, true); + Thread.sleep(5000); + System.out.println("checking records.........................."); + makeSureForwardingFlowControl(producerWindow, 0, 1); + + + removeConsumer(1); + } + + private void makeSureForwardingFlowControl(int producerWindow, int... indices) throws NoSuchFieldException, IllegalAccessException { + for (int i : indices) { + ClusterConnectionImpl cc = (ClusterConnectionImpl) servers[i].getClusterManager().getClusterConnection("cluster" + i); + Map map = cc.getRecords(); + assertEquals(1, map.size()); + MessageFlowRecord record = map.entrySet().iterator().next().getValue(); + + Field f = record.getClass().getDeclaredField("targetLocator"); //NoSuchFieldException + f.setAccessible(true); + ServerLocatorInternal targetLocator = (ServerLocatorInternal) f.get(record); + assertEquals(producerWindow, targetLocator.getProducerWindowSize()); + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 1d6cf4f876..89d5175dbb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1734,6 +1734,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { final boolean netty, final int nodeFrom, final int... nodesTo) { + setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo); + } + + protected void setupClusterConnection(final String name, + final String address, + final MessageLoadBalancingType messageLoadBalancingType, + final int maxHops, + final boolean netty, + final ClusterConfigCallback cb, + final int nodeFrom, + final int... nodesTo) { ActiveMQServer serverFrom = servers[nodeFrom]; if (serverFrom == null) { @@ -1752,6 +1763,9 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { Configuration config = serverFrom.getConfiguration(); ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs); + if (cb != null) { + cb.configure(clusterConf); + } config.getClusterConfigurations().add(clusterConf); } @@ -1912,4 +1926,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { int port = TransportConstants.DEFAULT_PORT + node; return "tcp://localhost:" + port; } + + public interface ClusterConfigCallback { + void configure(ClusterConnectionConfiguration config); + } }