diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index af047fa0d3..420acac70b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -627,6 +627,9 @@ public final class ActiveMQDefaultConfiguration { public static final String DEFAULT_TEMPORARY_QUEUE_NAMESPACE = ""; + // Number of concurrent workers for a core bridge + public static int DEFAULT_BRIDGE_CONCURRENCY = 1; + /** * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. */ @@ -1714,4 +1717,8 @@ public final class ActiveMQDefaultConfiguration { public static String getDefaultTemporaryQueueNamespace() { return DEFAULT_TEMPORARY_QUEUE_NAMESPACE; } + + public static int getDefaultBridgeConcurrency() { + return DEFAULT_BRIDGE_CONCURRENCY; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java index 9e3d63e204..aaafebd418 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java @@ -60,6 +60,7 @@ public final class BridgeConfiguration implements Serializable { public static String MIN_LARGE_MESSAGE_SIZE = "min-large-message-size"; public static String CALL_TIMEOUT = "call-timeout"; public static String ROUTING_TYPE = "routing-type"; + public static String CONCURRENCY = "concurrency"; private String name = null; @@ -112,6 +113,8 @@ public final class BridgeConfiguration implements Serializable { private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()); + private int concurrency = ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(); + public BridgeConfiguration() { } @@ -147,6 +150,7 @@ public final class BridgeConfiguration implements Serializable { *
  • min-large-message-size: {@link #MIN_LARGE_MESSAGE_SIZE} *
  • call-timeout: {@link #CALL_TIMEOUT} *
  • routing-type: {@link #ROUTING_TYPE} + *
  • concurrency: {@link #CONCURRENCY} *

    * The {@code String}-based values will be converted to the proper value types based on the underlying property. For * example, if you pass the value "TRUE" for the key "auto-created" the {@code String} "TRUE" will be converted to @@ -214,6 +218,8 @@ public final class BridgeConfiguration implements Serializable { setCallTimeout(Long.parseLong(value)); } else if (key.equals(ROUTING_TYPE)) { setRoutingType(ComponentConfigurationRoutingType.valueOf(value)); + } else if (key.equals(CONCURRENCY)) { + setConcurrency(Integer.parseInt(value)); } } return this; @@ -487,6 +493,21 @@ public final class BridgeConfiguration implements Serializable { return this; } + /** + * @return the bridge concurrency + */ + public int getConcurrency() { + return concurrency; + } + + /** + * @param concurrency the bridge concurrency to set + */ + public BridgeConfiguration setConcurrency(int concurrency) { + this.concurrency = concurrency; + return this; + } + /** * At this point this is only changed on testcases * The bridge shouldn't be sending blocking anyways @@ -546,6 +567,7 @@ public final class BridgeConfiguration implements Serializable { builder.add(MAX_RETRY_INTERVAL, getMaxRetryInterval()); builder.add(MIN_LARGE_MESSAGE_SIZE, getMinLargeMessageSize()); builder.add(CALL_TIMEOUT, getCallTimeout()); + builder.add(CONCURRENCY, getConcurrency()); // complex fields (only serialize if value is not null) @@ -639,6 +661,7 @@ public final class BridgeConfiguration implements Serializable { result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode()); result = prime * result + (useDuplicateDetection ? 1231 : 1237); result = prime * result + ((user == null) ? 0 : user.hashCode()); + result = prime * result + concurrency; return result; } @@ -722,6 +745,8 @@ public final class BridgeConfiguration implements Serializable { return false; } else if (!user.equals(other.user)) return false; + if (concurrency != other.concurrency) + return false; return true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c5fd448935..6994860c47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2085,6 +2085,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE)); + int concurrency = getInteger(brNode, "concurrency", ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(), Validators.GT_ZERO); NodeList clusterPassNodes = brNode.getElementsByTagName("password"); String password = null; @@ -2151,7 +2152,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { .setHA(ha) .setUser(user) .setPassword(password) - .setRoutingType(routingType); + .setRoutingType(routingType) + .setConcurrency(concurrency); if (!staticConnectorNames.isEmpty()) { config.setStaticConnectors(staticConnectorNames); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index ac483024e6..05008d0244 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -475,16 +475,22 @@ public final class ClusterManager implements ActiveMQComponent { clusterLocators.add(serverLocator); - Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType()); + for (int i = 0; i < config.getConcurrency(); i++) { + String name = (config.getName() + "-" + i); + Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), + config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), + config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(name), queue, + executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), + SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, + config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, + config.getRoutingType()); + bridges.put(name, bridge); + managementService.registerBridge(bridge, config); + bridge.start(); - bridges.put(config.getName(), bridge); - - managementService.registerBridge(bridge, config); - - bridge.start(); - - if (server.hasBrokerBridgePlugins()) { - server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge)); + if (server.hasBrokerBridgePlugins()) { + server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge)); + } } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 6bf6097494..6788727bb4 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1676,6 +1676,16 @@ + + + + + Number of concurrent workers, more workers can help increase throughput on high latency networks. + Defaults to 1 + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java index 91e6f5cd87..85ed6b9378 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java @@ -66,6 +66,7 @@ public class BridgeConfigurationTest { Assert.assertEquals(11, bridgeConfiguration.getMinLargeMessageSize()); Assert.assertEquals(12, bridgeConfiguration.getCallTimeout()); Assert.assertEquals(ComponentConfigurationRoutingType.MULTICAST, bridgeConfiguration.getRoutingType()); + Assert.assertEquals(1, bridgeConfiguration.getConcurrency()); } @Test @@ -103,6 +104,7 @@ public class BridgeConfigurationTest { Assert.assertEquals("2000", jsonObject.get(BridgeConfiguration.MAX_RETRY_INTERVAL).toString()); Assert.assertEquals("102400", jsonObject.get(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE).toString()); Assert.assertEquals("30000", jsonObject.get(BridgeConfiguration.CALL_TIMEOUT).toString()); + Assert.assertEquals("1", jsonObject.get(BridgeConfiguration.CONCURRENCY).toString()); // also should contain default non-null values of string fields Assert.assertEquals("\"ACTIVEMQ.CLUSTER.ADMIN.USER\"", jsonObject.get(BridgeConfiguration.USER).toString()); @@ -188,6 +190,7 @@ public class BridgeConfigurationTest { objectBuilder.add(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE, 11); objectBuilder.add(BridgeConfiguration.CALL_TIMEOUT, 12); objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST"); + objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1); return objectBuilder.build(); } diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 4027a5037a..ba7ebb6989 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -282,6 +282,7 @@ Name | Description | Default [password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a [reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10 [routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS` +[concurrency](core-bridges.md) | Concurrency of the bridge | 1 ## broadcast-group type @@ -424,4 +425,4 @@ Name | Description | Default [address](message-grouping.md#clustered-grouping) | A reference to a `cluster-connection` `address` | n/a [timeout](message-grouping.md#clustered-grouping) | How long to wait for a decision | 5000 [group-timeout](message-grouping.md#clustered-grouping) | How long a group binding will be used. | -1 (disabled) -[reaper-period](message-grouping.md#clustered-grouping) | How often the reaper will be run to check for timed out group bindings. Only valid for `LOCAL` handlers. | 30000 \ No newline at end of file +[reaper-period](message-grouping.md#clustered-grouping) | How often the reaper will be run to check for timed out group bindings. Only valid for `LOCAL` handlers. | 30000 diff --git a/docs/user-manual/en/core-bridges.md b/docs/user-manual/en/core-bridges.md index d51e0bd67d..98b13f5b8d 100644 --- a/docs/user-manual/en/core-bridges.md +++ b/docs/user-manual/en/core-bridges.md @@ -61,6 +61,7 @@ actually from the bridge example): foouser foopassword PASS + 1 remote-connector @@ -185,6 +186,14 @@ Let's take a look at all the parameters in turn: flexibility to deal with any situation. Valid values are `ANYCAST`, `MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`. +- `concurrency`. For bridging high latency networks, and particularly for destinations + with a high throughput, more workers might have to be commited to the bridge. This is + done with the concurrency parameter. Increasing the concurrency will get reflected + by more consumers and producers showing up on the bridged destination, allowing + for increased parallelism across high latency networks. + + Default=1 + - `static-connectors` or `discovery-group-ref`. Pick either of these options to connect the bridge to the target server. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java index 3cdfb5ab04..4bd1d03537 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java @@ -131,6 +131,7 @@ public class BridgeRoutingTest extends ActiveMQTestBase { int destinationMessageCount) throws Exception { SimpleString source = SimpleString.toSimpleString("source"); SimpleString destination = SimpleString.toSimpleString("destination"); + int concurrency = 2; server0.createQueue(new QueueConfiguration(source).setRoutingType(sourceRoutingType)); server1.createQueue(new QueueConfiguration(destination).setRoutingType(destinationRoutingType)); @@ -141,6 +142,7 @@ public class BridgeRoutingTest extends ActiveMQTestBase { .setForwardingAddress(destination.toString()) .setQueueName(source.toString()) .setConfirmationWindowSize(10) + .setConcurrency(concurrency) .setStaticConnectors(Arrays.asList("connector"))); try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL()); @@ -149,10 +151,10 @@ public class BridgeRoutingTest extends ActiveMQTestBase { ClientProducer producer = session.createProducer(source)) { producer.send(session.createMessage(true).setRoutingType(sourceRoutingType)); } - Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100); - Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100); + Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge-0").getMetrics().getMessagesAcknowledged() == 1,2000, 100); Thread.sleep(sleepTime); assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100)); + assertTrue(Wait.waitFor(() -> server0.locateQueue(source).getConsumerCount() == concurrency, 2000, 100)); } } \ No newline at end of file