This commit is contained in:
Clebert Suconic 2021-03-22 19:13:06 -04:00
commit 6f7dc5c31b
9 changed files with 78 additions and 13 deletions

View File

@ -627,6 +627,9 @@ public final class ActiveMQDefaultConfiguration {
public static final String DEFAULT_TEMPORARY_QUEUE_NAMESPACE = "";
// Number of concurrent workers for a core bridge
public static int DEFAULT_BRIDGE_CONCURRENCY = 1;
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1714,4 +1717,8 @@ public final class ActiveMQDefaultConfiguration {
public static String getDefaultTemporaryQueueNamespace() {
return DEFAULT_TEMPORARY_QUEUE_NAMESPACE;
}
public static int getDefaultBridgeConcurrency() {
return DEFAULT_BRIDGE_CONCURRENCY;
}
}

View File

@ -60,6 +60,7 @@ public final class BridgeConfiguration implements Serializable {
public static String MIN_LARGE_MESSAGE_SIZE = "min-large-message-size";
public static String CALL_TIMEOUT = "call-timeout";
public static String ROUTING_TYPE = "routing-type";
public static String CONCURRENCY = "concurrency";
private String name = null;
@ -112,6 +113,8 @@ public final class BridgeConfiguration implements Serializable {
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType());
private int concurrency = ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency();
public BridgeConfiguration() {
}
@ -147,6 +150,7 @@ public final class BridgeConfiguration implements Serializable {
* <li>min-large-message-size: {@link #MIN_LARGE_MESSAGE_SIZE}
* <li>call-timeout: {@link #CALL_TIMEOUT}
* <li>routing-type: {@link #ROUTING_TYPE}
* <li>concurrency: {@link #CONCURRENCY}
* </ul><p>
* The {@code String}-based values will be converted to the proper value types based on the underlying property. For
* example, if you pass the value "TRUE" for the key "auto-created" the {@code String} "TRUE" will be converted to
@ -214,6 +218,8 @@ public final class BridgeConfiguration implements Serializable {
setCallTimeout(Long.parseLong(value));
} else if (key.equals(ROUTING_TYPE)) {
setRoutingType(ComponentConfigurationRoutingType.valueOf(value));
} else if (key.equals(CONCURRENCY)) {
setConcurrency(Integer.parseInt(value));
}
}
return this;
@ -487,6 +493,21 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
/**
* @return the bridge concurrency
*/
public int getConcurrency() {
return concurrency;
}
/**
* @param concurrency the bridge concurrency to set
*/
public BridgeConfiguration setConcurrency(int concurrency) {
this.concurrency = concurrency;
return this;
}
/**
* At this point this is only changed on testcases
* The bridge shouldn't be sending blocking anyways
@ -546,6 +567,7 @@ public final class BridgeConfiguration implements Serializable {
builder.add(MAX_RETRY_INTERVAL, getMaxRetryInterval());
builder.add(MIN_LARGE_MESSAGE_SIZE, getMinLargeMessageSize());
builder.add(CALL_TIMEOUT, getCallTimeout());
builder.add(CONCURRENCY, getConcurrency());
// complex fields (only serialize if value is not null)
@ -639,6 +661,7 @@ public final class BridgeConfiguration implements Serializable {
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
result = prime * result + ((user == null) ? 0 : user.hashCode());
result = prime * result + concurrency;
return result;
}
@ -722,6 +745,8 @@ public final class BridgeConfiguration implements Serializable {
return false;
} else if (!user.equals(other.user))
return false;
if (concurrency != other.concurrency)
return false;
return true;
}

View File

@ -2085,6 +2085,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
int concurrency = getInteger(brNode, "concurrency", ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(), Validators.GT_ZERO);
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
String password = null;
@ -2151,7 +2152,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
.setHA(ha)
.setUser(user)
.setPassword(password)
.setRoutingType(routingType);
.setRoutingType(routingType)
.setConcurrency(concurrency);
if (!staticConnectorNames.isEmpty()) {
config.setStaticConnectors(staticConnectorNames);

View File

@ -475,16 +475,22 @@ public final class ClusterManager implements ActiveMQComponent {
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType());
for (int i = 0; i < config.getConcurrency(); i++) {
String name = (config.getName() + "-" + i);
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(),
config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(name), queue,
executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()),
SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer,
config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server,
config.getRoutingType());
bridges.put(name, bridge);
managementService.registerBridge(bridge, config);
bridge.start();
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
bridge.start();
if (server.hasBrokerBridgePlugins()) {
server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge));
if (server.hasBrokerBridgePlugins()) {
server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge));
}
}
}

View File

@ -1676,6 +1676,16 @@
</xsd:complexType>
</xsd:element>
</xsd:choice>
<xsd:element name="concurrency" type="xsd:int" default="1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Number of concurrent workers, more workers can help increase throughput on high latency networks.
Defaults to 1
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:ID" use="required">

View File

@ -66,6 +66,7 @@ public class BridgeConfigurationTest {
Assert.assertEquals(11, bridgeConfiguration.getMinLargeMessageSize());
Assert.assertEquals(12, bridgeConfiguration.getCallTimeout());
Assert.assertEquals(ComponentConfigurationRoutingType.MULTICAST, bridgeConfiguration.getRoutingType());
Assert.assertEquals(1, bridgeConfiguration.getConcurrency());
}
@Test
@ -103,6 +104,7 @@ public class BridgeConfigurationTest {
Assert.assertEquals("2000", jsonObject.get(BridgeConfiguration.MAX_RETRY_INTERVAL).toString());
Assert.assertEquals("102400", jsonObject.get(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE).toString());
Assert.assertEquals("30000", jsonObject.get(BridgeConfiguration.CALL_TIMEOUT).toString());
Assert.assertEquals("1", jsonObject.get(BridgeConfiguration.CONCURRENCY).toString());
// also should contain default non-null values of string fields
Assert.assertEquals("\"ACTIVEMQ.CLUSTER.ADMIN.USER\"", jsonObject.get(BridgeConfiguration.USER).toString());
@ -188,6 +190,7 @@ public class BridgeConfigurationTest {
objectBuilder.add(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE, 11);
objectBuilder.add(BridgeConfiguration.CALL_TIMEOUT, 12);
objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST");
objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1);
return objectBuilder.build();
}

View File

@ -282,6 +282,7 @@ Name | Description | Default
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS`
[concurrency](core-bridges.md) | Concurrency of the bridge | 1
## broadcast-group type
@ -424,4 +425,4 @@ Name | Description | Default
[address](message-grouping.md#clustered-grouping) | A reference to a `cluster-connection` `address` | n/a
[timeout](message-grouping.md#clustered-grouping) | How long to wait for a decision | 5000
[group-timeout](message-grouping.md#clustered-grouping) | How long a group binding will be used. | -1 (disabled)
[reaper-period](message-grouping.md#clustered-grouping) | How often the reaper will be run to check for timed out group bindings. Only valid for `LOCAL` handlers. | 30000
[reaper-period](message-grouping.md#clustered-grouping) | How often the reaper will be run to check for timed out group bindings. Only valid for `LOCAL` handlers. | 30000

View File

@ -61,6 +61,7 @@ actually from the bridge example):
<user>foouser</user>
<password>foopassword</password>
<routing-type>PASS</routing-type>
<concurrency>1</concurrency>
<static-connectors>
<connector-ref>remote-connector</connector-ref>
</static-connectors>
@ -185,6 +186,14 @@ Let's take a look at all the parameters in turn:
flexibility to deal with any situation. Valid values are `ANYCAST`,
`MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
- `concurrency`. For bridging high latency networks, and particularly for destinations
with a high throughput, more workers might have to be commited to the bridge. This is
done with the concurrency parameter. Increasing the concurrency will get reflected
by more consumers and producers showing up on the bridged destination, allowing
for increased parallelism across high latency networks.
Default=1
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
connect the bridge to the target server.

View File

@ -131,6 +131,7 @@ public class BridgeRoutingTest extends ActiveMQTestBase {
int destinationMessageCount) throws Exception {
SimpleString source = SimpleString.toSimpleString("source");
SimpleString destination = SimpleString.toSimpleString("destination");
int concurrency = 2;
server0.createQueue(new QueueConfiguration(source).setRoutingType(sourceRoutingType));
server1.createQueue(new QueueConfiguration(destination).setRoutingType(destinationRoutingType));
@ -141,6 +142,7 @@ public class BridgeRoutingTest extends ActiveMQTestBase {
.setForwardingAddress(destination.toString())
.setQueueName(source.toString())
.setConfirmationWindowSize(10)
.setConcurrency(concurrency)
.setStaticConnectors(Arrays.asList("connector")));
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL());
@ -149,10 +151,10 @@ public class BridgeRoutingTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer(source)) {
producer.send(session.createMessage(true).setRoutingType(sourceRoutingType));
}
Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100);
Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100);
Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge-0").getMetrics().getMessagesAcknowledged() == 1,2000, 100);
Thread.sleep(sleepTime);
assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100));
assertTrue(Wait.waitFor(() -> server0.locateQueue(source).getConsumerCount() == concurrency, 2000, 100));
}
}