ARTEMIS-1621 Make producerWindowSize configurable on clusterconnection bridges

The cluster connection bridge hard codes its producerWindowSize to -1
(meaning no producer flow control) even if you configure it otherwise.
This commit is contained in:
Howard Gao 2018-01-19 16:36:47 +08:00 committed by Justin Bertram
parent 31ce365334
commit 98ce31bf58
4 changed files with 133 additions and 2 deletions

View File

@ -788,8 +788,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier); targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
targetLocator.setMinLargeMessageSize(minLargeMessageSize); targetLocator.setMinLargeMessageSize(minLargeMessageSize);
// No producer flow control on the bridges, as we don't want to lock the queues // No producer flow control on the bridges by default, as we don't want to lock the queues
targetLocator.setProducerWindowSize(-1); targetLocator.setProducerWindowSize(this.producerWindowSize);
targetLocator.setAfterConnectionInternalListener(this); targetLocator.setAfterConnectionInternalListener(this);

View File

@ -61,4 +61,11 @@ public class ClusterConnectionConfigurationTest {
Assert.assertEquals("myGroup", configuration.getDiscoveryGroupName()); Assert.assertEquals("myGroup", configuration.getDiscoveryGroupName());
Assert.assertEquals(132, configuration.getMinLargeMessageSize()); Assert.assertEquals(132, configuration.getMinLargeMessageSize());
} }
@Test
public void testClusterConnectionProducerWindowSize() throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
ClusterConnectionConfiguration configuration = parser.newObject(new URI("static:(tcp://localhost:6556)?producerWindowSize=1234"), null);
Assert.assertEquals(1234, configuration.getProducerWindowSize());
}
} }

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
public class ClusterConnectionConfigTest extends ClusterTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
start();
}
private void start() throws Exception {
setupServers();
}
protected boolean isNetty() {
return true;
}
protected void setupServers() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
}
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType, final ClusterConfigCallback cb) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), cb, 0, 1, 2);
setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), cb, 1, 0, 2);
}
@Test
public void testRedistributionFlowControl() throws Exception {
final int producerWindow = new Random().nextInt(Integer.MAX_VALUE);
System.out.println("window is: " + producerWindow);
setupCluster(MessageLoadBalancingType.ON_DEMAND, (ClusterConnectionConfiguration cfg) -> {
cfg.setProducerWindowSize(producerWindow);
});
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
addConsumer(1, 1, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
send(0, "queues.testaddress", 1, false, null);
//receiveMessages(consumers[1].consumer, 0, 1, true);
Thread.sleep(5000);
System.out.println("checking records..........................");
makeSureForwardingFlowControl(producerWindow, 0, 1);
removeConsumer(1);
}
private void makeSureForwardingFlowControl(int producerWindow, int... indices) throws NoSuchFieldException, IllegalAccessException {
for (int i : indices) {
ClusterConnectionImpl cc = (ClusterConnectionImpl) servers[i].getClusterManager().getClusterConnection("cluster" + i);
Map<String, MessageFlowRecord> map = cc.getRecords();
assertEquals(1, map.size());
MessageFlowRecord record = map.entrySet().iterator().next().getValue();
Field f = record.getClass().getDeclaredField("targetLocator"); //NoSuchFieldException
f.setAccessible(true);
ServerLocatorInternal targetLocator = (ServerLocatorInternal) f.get(record);
assertEquals(producerWindow, targetLocator.getProducerWindowSize());
}
}
}

View File

@ -1734,6 +1734,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
final boolean netty, final boolean netty,
final int nodeFrom, final int nodeFrom,
final int... nodesTo) { final int... nodesTo) {
setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
}
protected void setupClusterConnection(final String name,
final String address,
final MessageLoadBalancingType messageLoadBalancingType,
final int maxHops,
final boolean netty,
final ClusterConfigCallback cb,
final int nodeFrom,
final int... nodesTo) {
ActiveMQServer serverFrom = servers[nodeFrom]; ActiveMQServer serverFrom = servers[nodeFrom];
if (serverFrom == null) { if (serverFrom == null) {
@ -1752,6 +1763,9 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
Configuration config = serverFrom.getConfiguration(); Configuration config = serverFrom.getConfiguration();
ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs); ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);
if (cb != null) {
cb.configure(clusterConf);
}
config.getClusterConfigurations().add(clusterConf); config.getClusterConfigurations().add(clusterConf);
} }
@ -1912,4 +1926,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
int port = TransportConstants.DEFAULT_PORT + node; int port = TransportConstants.DEFAULT_PORT + node;
return "tcp://localhost:" + port; return "tcp://localhost:" + port;
} }
public interface ClusterConfigCallback {
void configure(ClusterConnectionConfiguration config);
}
} }