ARTEMIS-3198 Add concurrency option on core bridges
This commit is contained in:
parent
af7857e3d1
commit
e9e1e476ee
|
@ -627,6 +627,9 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
|
|
||||||
public static final String DEFAULT_TEMPORARY_QUEUE_NAMESPACE = "";
|
public static final String DEFAULT_TEMPORARY_QUEUE_NAMESPACE = "";
|
||||||
|
|
||||||
|
// Number of concurrent workers for a core bridge
|
||||||
|
public static int DEFAULT_BRIDGE_CONCURRENCY = 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
|
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
|
||||||
*/
|
*/
|
||||||
|
@ -1714,4 +1717,8 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
public static String getDefaultTemporaryQueueNamespace() {
|
public static String getDefaultTemporaryQueueNamespace() {
|
||||||
return DEFAULT_TEMPORARY_QUEUE_NAMESPACE;
|
return DEFAULT_TEMPORARY_QUEUE_NAMESPACE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int getDefaultBridgeConcurrency() {
|
||||||
|
return DEFAULT_BRIDGE_CONCURRENCY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
public static String MIN_LARGE_MESSAGE_SIZE = "min-large-message-size";
|
public static String MIN_LARGE_MESSAGE_SIZE = "min-large-message-size";
|
||||||
public static String CALL_TIMEOUT = "call-timeout";
|
public static String CALL_TIMEOUT = "call-timeout";
|
||||||
public static String ROUTING_TYPE = "routing-type";
|
public static String ROUTING_TYPE = "routing-type";
|
||||||
|
public static String CONCURRENCY = "concurrency";
|
||||||
|
|
||||||
private String name = null;
|
private String name = null;
|
||||||
|
|
||||||
|
@ -112,6 +113,8 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
|
|
||||||
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType());
|
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType());
|
||||||
|
|
||||||
|
private int concurrency = ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency();
|
||||||
|
|
||||||
public BridgeConfiguration() {
|
public BridgeConfiguration() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,6 +150,7 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
* <li>min-large-message-size: {@link #MIN_LARGE_MESSAGE_SIZE}
|
* <li>min-large-message-size: {@link #MIN_LARGE_MESSAGE_SIZE}
|
||||||
* <li>call-timeout: {@link #CALL_TIMEOUT}
|
* <li>call-timeout: {@link #CALL_TIMEOUT}
|
||||||
* <li>routing-type: {@link #ROUTING_TYPE}
|
* <li>routing-type: {@link #ROUTING_TYPE}
|
||||||
|
* <li>concurrency: {@link #CONCURRENCY}
|
||||||
* </ul><p>
|
* </ul><p>
|
||||||
* The {@code String}-based values will be converted to the proper value types based on the underlying property. For
|
* The {@code String}-based values will be converted to the proper value types based on the underlying property. For
|
||||||
* example, if you pass the value "TRUE" for the key "auto-created" the {@code String} "TRUE" will be converted to
|
* example, if you pass the value "TRUE" for the key "auto-created" the {@code String} "TRUE" will be converted to
|
||||||
|
@ -214,6 +218,8 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
setCallTimeout(Long.parseLong(value));
|
setCallTimeout(Long.parseLong(value));
|
||||||
} else if (key.equals(ROUTING_TYPE)) {
|
} else if (key.equals(ROUTING_TYPE)) {
|
||||||
setRoutingType(ComponentConfigurationRoutingType.valueOf(value));
|
setRoutingType(ComponentConfigurationRoutingType.valueOf(value));
|
||||||
|
} else if (key.equals(CONCURRENCY)) {
|
||||||
|
setConcurrency(Integer.parseInt(value));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
|
@ -487,6 +493,21 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the bridge concurrency
|
||||||
|
*/
|
||||||
|
public int getConcurrency() {
|
||||||
|
return concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param concurrency the bridge concurrency to set
|
||||||
|
*/
|
||||||
|
public BridgeConfiguration setConcurrency(int concurrency) {
|
||||||
|
this.concurrency = concurrency;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* At this point this is only changed on testcases
|
* At this point this is only changed on testcases
|
||||||
* The bridge shouldn't be sending blocking anyways
|
* The bridge shouldn't be sending blocking anyways
|
||||||
|
@ -546,6 +567,7 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
builder.add(MAX_RETRY_INTERVAL, getMaxRetryInterval());
|
builder.add(MAX_RETRY_INTERVAL, getMaxRetryInterval());
|
||||||
builder.add(MIN_LARGE_MESSAGE_SIZE, getMinLargeMessageSize());
|
builder.add(MIN_LARGE_MESSAGE_SIZE, getMinLargeMessageSize());
|
||||||
builder.add(CALL_TIMEOUT, getCallTimeout());
|
builder.add(CALL_TIMEOUT, getCallTimeout());
|
||||||
|
builder.add(CONCURRENCY, getConcurrency());
|
||||||
|
|
||||||
// complex fields (only serialize if value is not null)
|
// complex fields (only serialize if value is not null)
|
||||||
|
|
||||||
|
@ -639,6 +661,7 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
|
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
|
||||||
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
|
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
|
||||||
result = prime * result + ((user == null) ? 0 : user.hashCode());
|
result = prime * result + ((user == null) ? 0 : user.hashCode());
|
||||||
|
result = prime * result + concurrency;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -722,6 +745,8 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
return false;
|
return false;
|
||||||
} else if (!user.equals(other.user))
|
} else if (!user.equals(other.user))
|
||||||
return false;
|
return false;
|
||||||
|
if (concurrency != other.concurrency)
|
||||||
|
return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2085,6 +2085,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
|
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
|
||||||
|
|
||||||
|
int concurrency = getInteger(brNode, "concurrency", ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(), Validators.GT_ZERO);
|
||||||
|
|
||||||
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
|
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
|
||||||
String password = null;
|
String password = null;
|
||||||
|
@ -2151,7 +2152,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
.setHA(ha)
|
.setHA(ha)
|
||||||
.setUser(user)
|
.setUser(user)
|
||||||
.setPassword(password)
|
.setPassword(password)
|
||||||
.setRoutingType(routingType);
|
.setRoutingType(routingType)
|
||||||
|
.setConcurrency(concurrency);
|
||||||
|
|
||||||
if (!staticConnectorNames.isEmpty()) {
|
if (!staticConnectorNames.isEmpty()) {
|
||||||
config.setStaticConnectors(staticConnectorNames);
|
config.setStaticConnectors(staticConnectorNames);
|
||||||
|
|
|
@ -475,18 +475,24 @@ public final class ClusterManager implements ActiveMQComponent {
|
||||||
|
|
||||||
clusterLocators.add(serverLocator);
|
clusterLocators.add(serverLocator);
|
||||||
|
|
||||||
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType());
|
for (int i = 0; i < config.getConcurrency(); i++) {
|
||||||
|
String name = (config.getName() + "-" + i);
|
||||||
bridges.put(config.getName(), bridge);
|
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(),
|
||||||
|
config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(),
|
||||||
|
config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(name), queue,
|
||||||
|
executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()),
|
||||||
|
SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer,
|
||||||
|
config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server,
|
||||||
|
config.getRoutingType());
|
||||||
|
bridges.put(name, bridge);
|
||||||
managementService.registerBridge(bridge, config);
|
managementService.registerBridge(bridge, config);
|
||||||
|
|
||||||
bridge.start();
|
bridge.start();
|
||||||
|
|
||||||
if (server.hasBrokerBridgePlugins()) {
|
if (server.hasBrokerBridgePlugins()) {
|
||||||
server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge));
|
server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {
|
public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {
|
||||||
|
|
||||||
|
|
|
@ -1676,6 +1676,16 @@
|
||||||
</xsd:complexType>
|
</xsd:complexType>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
</xsd:choice>
|
</xsd:choice>
|
||||||
|
|
||||||
|
<xsd:element name="concurrency" type="xsd:int" default="1" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
Number of concurrent workers, more workers can help increase throughput on high latency networks.
|
||||||
|
Defaults to 1
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
</xsd:sequence>
|
</xsd:sequence>
|
||||||
|
|
||||||
<xsd:attribute name="name" type="xsd:ID" use="required">
|
<xsd:attribute name="name" type="xsd:ID" use="required">
|
||||||
|
|
|
@ -66,6 +66,7 @@ public class BridgeConfigurationTest {
|
||||||
Assert.assertEquals(11, bridgeConfiguration.getMinLargeMessageSize());
|
Assert.assertEquals(11, bridgeConfiguration.getMinLargeMessageSize());
|
||||||
Assert.assertEquals(12, bridgeConfiguration.getCallTimeout());
|
Assert.assertEquals(12, bridgeConfiguration.getCallTimeout());
|
||||||
Assert.assertEquals(ComponentConfigurationRoutingType.MULTICAST, bridgeConfiguration.getRoutingType());
|
Assert.assertEquals(ComponentConfigurationRoutingType.MULTICAST, bridgeConfiguration.getRoutingType());
|
||||||
|
Assert.assertEquals(1, bridgeConfiguration.getConcurrency());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -103,6 +104,7 @@ public class BridgeConfigurationTest {
|
||||||
Assert.assertEquals("2000", jsonObject.get(BridgeConfiguration.MAX_RETRY_INTERVAL).toString());
|
Assert.assertEquals("2000", jsonObject.get(BridgeConfiguration.MAX_RETRY_INTERVAL).toString());
|
||||||
Assert.assertEquals("102400", jsonObject.get(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE).toString());
|
Assert.assertEquals("102400", jsonObject.get(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE).toString());
|
||||||
Assert.assertEquals("30000", jsonObject.get(BridgeConfiguration.CALL_TIMEOUT).toString());
|
Assert.assertEquals("30000", jsonObject.get(BridgeConfiguration.CALL_TIMEOUT).toString());
|
||||||
|
Assert.assertEquals("1", jsonObject.get(BridgeConfiguration.CONCURRENCY).toString());
|
||||||
|
|
||||||
// also should contain default non-null values of string fields
|
// also should contain default non-null values of string fields
|
||||||
Assert.assertEquals("\"ACTIVEMQ.CLUSTER.ADMIN.USER\"", jsonObject.get(BridgeConfiguration.USER).toString());
|
Assert.assertEquals("\"ACTIVEMQ.CLUSTER.ADMIN.USER\"", jsonObject.get(BridgeConfiguration.USER).toString());
|
||||||
|
@ -188,6 +190,7 @@ public class BridgeConfigurationTest {
|
||||||
objectBuilder.add(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE, 11);
|
objectBuilder.add(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE, 11);
|
||||||
objectBuilder.add(BridgeConfiguration.CALL_TIMEOUT, 12);
|
objectBuilder.add(BridgeConfiguration.CALL_TIMEOUT, 12);
|
||||||
objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST");
|
objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST");
|
||||||
|
objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1);
|
||||||
|
|
||||||
return objectBuilder.build();
|
return objectBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -282,6 +282,7 @@ Name | Description | Default
|
||||||
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
|
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
|
||||||
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
|
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
|
||||||
[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS`
|
[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS`
|
||||||
|
[concurrency](core-bridges.md) | Concurrency of the bridge | 1
|
||||||
|
|
||||||
## broadcast-group type
|
## broadcast-group type
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,7 @@ actually from the bridge example):
|
||||||
<user>foouser</user>
|
<user>foouser</user>
|
||||||
<password>foopassword</password>
|
<password>foopassword</password>
|
||||||
<routing-type>PASS</routing-type>
|
<routing-type>PASS</routing-type>
|
||||||
|
<concurrency>1</concurrency>
|
||||||
<static-connectors>
|
<static-connectors>
|
||||||
<connector-ref>remote-connector</connector-ref>
|
<connector-ref>remote-connector</connector-ref>
|
||||||
</static-connectors>
|
</static-connectors>
|
||||||
|
@ -185,6 +186,14 @@ Let's take a look at all the parameters in turn:
|
||||||
flexibility to deal with any situation. Valid values are `ANYCAST`,
|
flexibility to deal with any situation. Valid values are `ANYCAST`,
|
||||||
`MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
|
`MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
|
||||||
|
|
||||||
|
- `concurrency`. For bridging high latency networks, and particularly for destinations
|
||||||
|
with a high throughput, more workers might have to be commited to the bridge. This is
|
||||||
|
done with the concurrency parameter. Increasing the concurrency will get reflected
|
||||||
|
by more consumers and producers showing up on the bridged destination, allowing
|
||||||
|
for increased parallelism across high latency networks.
|
||||||
|
|
||||||
|
Default=1
|
||||||
|
|
||||||
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
|
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
|
||||||
connect the bridge to the target server.
|
connect the bridge to the target server.
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,7 @@ public class BridgeRoutingTest extends ActiveMQTestBase {
|
||||||
int destinationMessageCount) throws Exception {
|
int destinationMessageCount) throws Exception {
|
||||||
SimpleString source = SimpleString.toSimpleString("source");
|
SimpleString source = SimpleString.toSimpleString("source");
|
||||||
SimpleString destination = SimpleString.toSimpleString("destination");
|
SimpleString destination = SimpleString.toSimpleString("destination");
|
||||||
|
int concurrency = 2;
|
||||||
|
|
||||||
server0.createQueue(new QueueConfiguration(source).setRoutingType(sourceRoutingType));
|
server0.createQueue(new QueueConfiguration(source).setRoutingType(sourceRoutingType));
|
||||||
server1.createQueue(new QueueConfiguration(destination).setRoutingType(destinationRoutingType));
|
server1.createQueue(new QueueConfiguration(destination).setRoutingType(destinationRoutingType));
|
||||||
|
@ -141,6 +142,7 @@ public class BridgeRoutingTest extends ActiveMQTestBase {
|
||||||
.setForwardingAddress(destination.toString())
|
.setForwardingAddress(destination.toString())
|
||||||
.setQueueName(source.toString())
|
.setQueueName(source.toString())
|
||||||
.setConfirmationWindowSize(10)
|
.setConfirmationWindowSize(10)
|
||||||
|
.setConcurrency(concurrency)
|
||||||
.setStaticConnectors(Arrays.asList("connector")));
|
.setStaticConnectors(Arrays.asList("connector")));
|
||||||
|
|
||||||
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL());
|
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL());
|
||||||
|
@ -149,10 +151,10 @@ public class BridgeRoutingTest extends ActiveMQTestBase {
|
||||||
ClientProducer producer = session.createProducer(source)) {
|
ClientProducer producer = session.createProducer(source)) {
|
||||||
producer.send(session.createMessage(true).setRoutingType(sourceRoutingType));
|
producer.send(session.createMessage(true).setRoutingType(sourceRoutingType));
|
||||||
}
|
}
|
||||||
|
|
||||||
Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100);
|
Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100);
|
||||||
Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100);
|
Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge-0").getMetrics().getMessagesAcknowledged() == 1,2000, 100);
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100));
|
assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100));
|
||||||
|
assertTrue(Wait.waitFor(() -> server0.locateQueue(source).getConsumerCount() == concurrency, 2000, 100));
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue