ARTEMIS-4182 support client ID config on bridges

Allow the client ID to be configured on normal bridge as well as
cluster-connection bridges. This makes the bridge connection easier to
identify on the target broker.
This commit is contained in:
Justin Bertram 2024-09-04 11:14:39 -05:00 committed by Robbie Gemmell
parent 2365ebf0a7
commit 46c7cb445c
21 changed files with 229 additions and 29 deletions

View File

@ -67,6 +67,7 @@ public final class BridgeConfiguration implements Serializable {
public static String CONCURRENCY = "concurrency";
public static String CONFIGURATION_MANAGED = "configuration-managed";
public static String PENDING_ACK_TIMEOUT = "pending-ack-timeout";
public static String CLIENT_ID = "client-id";
private String name = null;
@ -127,6 +128,8 @@ public final class BridgeConfiguration implements Serializable {
private boolean configurationManaged = true;
private String clientId = null;
public BridgeConfiguration() {
}
@ -159,6 +162,7 @@ public final class BridgeConfiguration implements Serializable {
concurrency = other.concurrency;
configurationManaged = other.configurationManaged;
pendingAckTimeout = other.pendingAckTimeout;
clientId = other.clientId;
}
public BridgeConfiguration(String name) {
@ -194,6 +198,7 @@ public final class BridgeConfiguration implements Serializable {
* <li>call-timeout: {@link #CALL_TIMEOUT}
* <li>routing-type: {@link #ROUTING_TYPE}
* <li>concurrency: {@link #CONCURRENCY}
* <li>client-id: {@link #CLIENT_ID}
* </ul><p>
* The {@code String}-based values will be converted to the proper value types based on the underlying property. For
* example, if you pass the value "TRUE" for the key "auto-created" the {@code String} "TRUE" will be converted to
@ -267,6 +272,8 @@ public final class BridgeConfiguration implements Serializable {
setConcurrency(Integer.parseInt(value));
} else if (key.equals(PENDING_ACK_TIMEOUT)) {
setPendingAckTimeout(Long.parseLong(value));
} else if (key.equals(CLIENT_ID)) {
setClientId(value);
}
}
return this;
@ -591,6 +598,21 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
/**
* @return the bridge client ID
*/
public String getClientId() {
return clientId;
}
/**
* @param clientId the bridge clientId to set
*/
public BridgeConfiguration setClientId(String clientId) {
this.clientId = clientId;
return this;
}
/**
* At this point this is only changed on testcases
* The bridge shouldn't be sending blocking anyways
@ -653,6 +675,9 @@ public final class BridgeConfiguration implements Serializable {
builder.add(CONCURRENCY, getConcurrency());
builder.add(CONFIGURATION_MANAGED, isConfigurationManaged());
builder.add(PENDING_ACK_TIMEOUT, getPendingAckTimeout());
if (getClientId() != null) {
builder.add(CLIENT_ID, getClientId());
}
// complex fields (only serialize if value is not null)
@ -749,6 +774,7 @@ public final class BridgeConfiguration implements Serializable {
result = prime * result + concurrency;
result = prime * result + (int) (pendingAckTimeout ^ (pendingAckTimeout >>> 32));
result = prime * result + (configurationManaged ? 1231 : 1237);
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
return result;
}
@ -838,6 +864,11 @@ public final class BridgeConfiguration implements Serializable {
return false;
if (configurationManaged != other.configurationManaged)
return false;
if (clientId == null) {
if (other.clientId != null)
return false;
} else if (!clientId.equals(other.clientId))
return false;
return true;
}
@ -886,7 +917,8 @@ public final class BridgeConfiguration implements Serializable {
DataConstants.SIZE_BYTE + // routingType
transformerSize +
staticConnectorSize +
BufferHelper.sizeOfNullableLong(pendingAckTimeout);
BufferHelper.sizeOfNullableLong(pendingAckTimeout) +
BufferHelper.sizeOfNullableString(clientId);
return size;
}
@ -936,6 +968,7 @@ public final class BridgeConfiguration implements Serializable {
buffer.writeInt(0);
}
buffer.writeNullableLong(pendingAckTimeout);
buffer.writeNullableString(clientId);
}
public void decode(ActiveMQBuffer buffer) {
@ -982,5 +1015,8 @@ public final class BridgeConfiguration implements Serializable {
if (buffer.readable()) {
pendingAckTimeout = buffer.readNullableLong();
}
if (buffer.readable()) {
clientId = buffer.readNullableString();
}
}
}

View File

@ -83,6 +83,8 @@ public final class ClusterConnectionConfiguration implements Serializable {
private int clusterNotificationAttempts = ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts();
private String clientId;
public ClusterConnectionConfiguration() {
}
@ -368,6 +370,15 @@ public final class ClusterConnectionConfiguration implements Serializable {
return this;
}
public String getClientId() {
return clientId;
}
public ClusterConnectionConfiguration setClientId(String clientId) {
this.clientId = clientId;
return this;
}
/**
* This method will match the configuration and return the proper TransportConfiguration for the Configuration
*/
@ -451,6 +462,7 @@ public final class ClusterConnectionConfiguration implements Serializable {
temp = Double.doubleToLongBits(retryIntervalMultiplier);
result = prime * result + (int) (temp ^ (temp >>> 32));
result = prime * result + ((staticConnectors == null) ? 0 : staticConnectors.hashCode());
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
return result;
}
@ -552,6 +564,13 @@ public final class ClusterConnectionConfiguration implements Serializable {
} else if (!staticConnectors.equals(other.staticConnectors)) {
return false;
}
if (clientId == null) {
if (other.clientId != null) {
return false;
}
} else if (!clientId.equals(other.clientId)) {
return false;
}
return true;
}
@ -581,6 +600,7 @@ public final class ClusterConnectionConfiguration implements Serializable {
", minLargeMessageSize=" + minLargeMessageSize +
", clusterNotificationInterval=" + clusterNotificationInterval +
", clusterNotificationAttempts=" + clusterNotificationAttempts +
", clientId=" + clientId +
'}';
}
}

View File

@ -2407,7 +2407,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
ClusterConnectionConfiguration config = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setDuplicateDetection(duplicateDetection).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setAllowDirectConnectionsOnly(allowDirectConnectionsOnly).setClusterNotificationInterval(clusterNotificationInterval).setClusterNotificationAttempts(clusterNotificationAttempts);
String clientId = getString(e, "client-id", null, NO_CHECK);
ClusterConnectionConfiguration config = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setDuplicateDetection(duplicateDetection).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setAllowDirectConnectionsOnly(allowDirectConnectionsOnly).setClusterNotificationInterval(clusterNotificationInterval).setClusterNotificationAttempts(clusterNotificationAttempts).setClientId(clientId);
if (discoveryGroupName == null) {
config.setStaticConnectors(staticConnectorNames);
@ -2482,6 +2484,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
long pendingAckTimeout = getLong(brNode, "pending-ack-timeout", ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout(), GT_ZERO);
String clientId = getString(brNode, "client-id", null, NO_CHECK);
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
String password = null;
@ -2549,7 +2553,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
.setPassword(password)
.setRoutingType(routingType)
.setConcurrency(concurrency)
.setPendingAckTimeout(pendingAckTimeout);
.setPendingAckTimeout(pendingAckTimeout)
.setClientId(clientId);
if (!staticConnectorNames.isEmpty()) {
config.setStaticConnectors(staticConnectorNames);

View File

@ -623,7 +623,7 @@ public class ClusterManager implements ActiveMQComponent {
logger.debug("{} Starting a Discovery Group Cluster Connection, name={}, dg={}", this, config.getDiscoveryGroupName(), dg);
}
clusterConnection = new ClusterConnectionImpl(this, dg, connector, SimpleString.of(config.getName()), SimpleString.of(config.getAddress() != null ? config.getAddress() : ""), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterConnection = new ClusterConnectionImpl(this, dg, connector, SimpleString.of(config.getName()), SimpleString.of(config.getAddress() != null ? config.getAddress() : ""), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts(), config.getClientId());
clusterController.addClusterConnection(clusterConnection.getName(), dg, config, connector);
} else {
@ -633,7 +633,7 @@ public class ClusterManager implements ActiveMQComponent {
logger.debug("{} defining cluster connection towards {}", this, Arrays.toString(tcConfigs));
}
clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, SimpleString.of(config.getName()), SimpleString.of(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, SimpleString.of(config.getName()), SimpleString.of(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts(), config.getClientId());
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config, connector);
}

View File

@ -1040,9 +1040,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return;
}
// Session is pre-acknowledge
session = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1);
session = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1, configuration.getClientId());
session.getProducerCreditManager().setCallback(BridgeImpl.this);
sessionConsumer = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1);
sessionConsumer = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1, configuration.getClientId());
}
if (configuration.getForwardingAddress() != null) {

View File

@ -123,7 +123,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
final MessageFlowRecord flowRecord,
final TransportConfiguration connector,
final String storeAndForwardPrefix,
final StorageManager storageManager) throws ActiveMQException {
final StorageManager storageManager,
final String clientId) throws ActiveMQException {
super(targetLocator, new BridgeConfiguration()
.setName(name == null ? null : name.toString())
.setInitialConnectAttempts(initialConnectAttempts)
@ -138,7 +139,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
.setUser(user)
.setPassword(password)
.setTransformerConfiguration(transformer)
.setRoutingType(ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType())), nodeUUID, queue, executor, scheduledExecutor, server);
.setRoutingType(ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()))
.setClientId(clientId), nodeUUID, queue, executor, scheduledExecutor, server);
this.discoveryLocator = discoveryLocator;

View File

@ -183,6 +183,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
private boolean splitBrainDetection;
private final String clientId;
/** For tests only */
public ServerLocatorInternal getServerLocator() {
@ -220,7 +222,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
final String clusterPassword,
final boolean allowDirectConnectionsOnly,
final long clusterNotificationInterval,
final int clusterNotificationAttempts) throws Exception {
final int clusterNotificationAttempts,
final String clientId) throws Exception {
this.nodeManager = nodeManager;
this.connector = connector;
@ -303,6 +306,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
}
this.storeAndForwardPrefix = server.getInternalNamingPrefix() + SN_PREFIX;
this.clientId = clientId;
}
public ClusterConnectionImpl(final ClusterManager manager,
@ -335,7 +340,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
final String clusterPassword,
final boolean allowDirectConnectionsOnly,
final long clusterNotificationInterval,
final int clusterNotificationAttempts) throws Exception {
final int clusterNotificationAttempts,
final String clientId) throws Exception {
this.nodeManager = nodeManager;
this.connector = connector;
@ -403,6 +409,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
this.manager = manager;
this.storeAndForwardPrefix = server.getInternalNamingPrefix() + SN_PREFIX;
this.clientId = clientId;
}
@Override
@ -908,7 +916,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor()));
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix, server.getStorageManager());
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix, server.getStorageManager(), clientId);
targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");

View File

@ -1570,6 +1570,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="client-id" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the identifier to use for the bridge connection; helps with identifying the connection on the remote broker
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element ref="discovery-type" maxOccurs="1" minOccurs="1"/>
</xsd:all>
@ -2598,6 +2606,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="client-id" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the identifier to use for the cluster connection; helps with identifying the connection on the remote broker
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element ref="discovery-type" maxOccurs="1" minOccurs="0"/>
</xsd:all>

View File

@ -94,6 +94,7 @@ public class BridgeConfigurationEncodingTest {
final ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.ANYCAST;
final long pendingAckTimeout = 5L;
final String staticConnector = "ii";
final String clientId = "mm";
BridgeConfiguration configuration = new BridgeConfiguration()
.setName(name)
@ -120,7 +121,8 @@ public class BridgeConfigurationEncodingTest {
.setCallTimeout(callTimeout)
.setConcurrency(concurrency)
.setConfigurationManaged(configurationManaged)
.setRoutingType(routingType);
.setRoutingType(routingType)
.setClientId(clientId);
if (transformer) {
final String transformerClass = "jj";
@ -322,6 +324,11 @@ public class BridgeConfigurationEncodingTest {
data.readBytes(read, 0, 9);
assertArrayEquals(new byte[] {DataConstants.NOT_NULL, 0, 0, 0, 0, 0, 0, 0, 5}, read);
// clientId
read = new byte[9];
data.readBytes(read, 0, 9);
assertArrayEquals(new byte[] {DataConstants.NOT_NULL, 0, 0, 0, 2, 0, 109, 0, 109}, read);
assertEquals(0, data.readableBytes());
}
@ -360,7 +367,8 @@ public class BridgeConfigurationEncodingTest {
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setTransformerConfiguration(mytransformer)
.setStaticConnectors(List.of("tcp://localhost:61616"))
.setPendingAckTimeout(13);
.setPendingAckTimeout(13)
.setClientId("myClientID");
int encodeSize = configuration.getEncodeSize();
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(encodeSize);
@ -402,6 +410,7 @@ public class BridgeConfigurationEncodingTest {
assertEquals("prop3", properties.get("key3"));
assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getStaticConnectors());
assertEquals(configuration.getPendingAckTimeout(), persistedBridgeConfiguration.getPendingAckTimeout());
assertEquals(configuration.getClientId(), persistedBridgeConfiguration.getClientId());
}
@Test
@ -436,7 +445,8 @@ public class BridgeConfigurationEncodingTest {
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setTransformerConfiguration(mytransformer)
.setStaticConnectors(List.of("tcp://localhost:61616"))
.setPendingAckTimeout(13);
.setPendingAckTimeout(13)
.setClientId("myClientID");
int encodeSize = configuration.getEncodeSize();
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(encodeSize);
@ -475,6 +485,7 @@ public class BridgeConfigurationEncodingTest {
assertEquals(0, properties.size());
assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getStaticConnectors());
assertEquals(configuration.getPendingAckTimeout(), persistedBridgeConfiguration.getPendingAckTimeout());
assertEquals(configuration.getClientId(), persistedBridgeConfiguration.getClientId());
}
@Test
@ -506,7 +517,8 @@ public class BridgeConfigurationEncodingTest {
.setConfigurationManaged(true)
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setStaticConnectors(List.of("tcp://localhost:61616"))
.setPendingAckTimeout(13);
.setPendingAckTimeout(13)
.setClientId("myClientID");
int encodeSize = configuration.getEncodeSize();
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(encodeSize);
@ -542,5 +554,6 @@ public class BridgeConfigurationEncodingTest {
assertNull(persistedBridgeConfiguration.getTransformerConfiguration());
assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getStaticConnectors());
assertEquals(configuration.getPendingAckTimeout(), persistedBridgeConfiguration.getPendingAckTimeout());
assertEquals(configuration.getClientId(), persistedBridgeConfiguration.getClientId());
}
}

View File

@ -74,6 +74,7 @@ public class BridgeConfigurationTest {
assertEquals(ComponentConfigurationRoutingType.MULTICAST, bridgeConfiguration.getRoutingType());
assertEquals(1, bridgeConfiguration.getConcurrency());
assertEquals(321, bridgeConfiguration.getPendingAckTimeout());
assertEquals("myClientID", bridgeConfiguration.getClientId());
}
@Test
@ -202,6 +203,7 @@ public class BridgeConfigurationTest {
objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1);
objectBuilder.add(BridgeConfiguration.CONFIGURATION_MANAGED, true);
objectBuilder.add(BridgeConfiguration.PENDING_ACK_TIMEOUT, 321);
objectBuilder.add(BridgeConfiguration.CLIENT_ID, "myClientID");
return objectBuilder.build();
}

View File

@ -430,12 +430,14 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
assertEquals("dg1", bc.getDiscoveryGroupName());
assertEquals(568320, bc.getProducerWindowSize());
assertEquals(ComponentConfigurationRoutingType.PASS, bc.getRoutingType());
assertNull(bc.getClientId());
} else if (bc.getName().equals("bridge3")) {
assertEquals("bridge3", bc.getName());
assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
assertEquals("bridgeTransformerValue1", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey1"));
assertEquals("bridgeTransformerValue2", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey2"));
assertEquals(123456, bc.getPendingAckTimeout());
assertEquals("myClientID", bc.getClientId());
}
}
@ -454,6 +456,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
assertEquals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, ccc.getMessageLoadBalancingType());
assertEquals(ActiveMQDefaultConfiguration.getDefaultClusterCallTimeout(), ccc.getCallTimeout());
assertEquals(ActiveMQDefaultConfiguration.getDefaultClusterCallFailoverTimeout(), ccc.getCallFailoverTimeout());
assertEquals("myClientID", ccc.getClientId());
} else if (ccc.getName().equals("cluster-connection1")) {
assertEquals("cluster-connection1", ccc.getName());
assertEquals(321, ccc.getMinLargeMessageSize(), "clusterConnectionConf minLargeMessageSize");

View File

@ -86,7 +86,8 @@ public class ClusterConnectionImplMockTest extends ServerTestBase {
null, //final String clusterPassword,
true, //final boolean allowDirectConnectionsOnly,
0, //final long clusterNotificationInterval,
0 //final int clusterNotificationAttempts)
0, //final int clusterNotificationAttempts)
null
);
assertEquals(1, cci.allowableConnections.size());
@ -105,7 +106,7 @@ public class ClusterConnectionImplMockTest extends ServerTestBase {
ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));
try {
ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, null, true, 0, 0);
ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, null, true, 0, 0, null);
TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null);
cci.nodeUP(topologyMember, false);

View File

@ -267,6 +267,7 @@
<pending-ack-timeout>123456</pending-ack-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
<forwarding-address>bridge-forwarding-address2</forwarding-address>
<client-id>myClientID</client-id>
</bridge>
<bridge name="bridge4">
<queue-name>queue3</queue-name>
@ -453,6 +454,7 @@
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
<client-id>myClientID</client-id>
</cluster-connection>
</cluster-connections>
<broker-connections>

View File

@ -261,6 +261,7 @@
<producer-window-size>555k</producer-window-size>
<pending-ack-timeout>123456</pending-ack-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
<client-id>myClientID</client-id>
</bridge>
<bridge name="bridge4">
<queue-name>queue3</queue-name>
@ -340,6 +341,7 @@
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
<client-id>myClientID</client-id>
</cluster-connection>
</cluster-connections>
<broker-connections>

View File

@ -53,6 +53,7 @@
<producer-window-size>555k</producer-window-size>
<pending-ack-timeout>123456</pending-ack-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
<client-id>myClientID</client-id>
</bridge>
<bridge name="bridge4">
<queue-name>queue3</queue-name>

View File

@ -51,5 +51,6 @@
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
<client-id>myClientID</client-id>
</cluster-connection>
</cluster-connections>

View File

@ -420,6 +420,7 @@ There can be zero or more cluster connections defined per Apache ActiveMQ Artemi
<notification-interval>1000</notification-interval>
<notification-attempts>2</notification-attempts>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<client-id></client-id>
</cluster-connection>
</cluster-connections>
----
@ -582,6 +583,11 @@ Here we have defined 2 servers that we know for sure will that at least one will
There may be many more servers in the cluster but these will;
be discovered via one of these connectors once an initial connection has been made.
client-id::
An optional identifier to use for the cluster connection.
This can help with identifying the connection on the remote broker (e.g. via the web console).
Default is empty (i.e. unset).
=== Cluster User Credentials
When creating connections between nodes of a cluster to form a cluster connection, Apache ActiveMQ Artemis uses a cluster user and cluster password which is defined in `broker.xml`:

View File

@ -65,6 +65,7 @@ Let's kick off with an example:
<!-- alternative to static-connectors
<discovery-group-ref discovery-group-name="bridge-discovery-group"/>
-->
<client-id>myClientID</client-id>
</bridge>
----
@ -215,3 +216,8 @@ Pick either this or `static-connector` to connect the bridge to the target serve
The `discovery-group-ref` element has one attribute - `discovery-group-name`.
This attribute points to a `discovery-group` defined elsewhere.
For more information about what discovery-groups are and how to configure them, please see xref:clusters.adoc#clusters[Discovery Groups].
client-id::
An optional identifier to use for the bridge connection.
This can help with identifying the connection on the remote broker (e.g. via the web console).
Default is empty (i.e. unset).

View File

@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -67,6 +68,8 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.management.impl.view.ConnectionField;
import org.apache.activemq.artemis.core.management.impl.view.predicate.ActiveMQFilterPredicate;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@ -88,6 +91,8 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
@ -554,6 +559,43 @@ public class BridgeTest extends ActiveMQTestBase {
assertTrue(serverLocator.getClientSessionFactoryCount() <= 1);
}
@TestTemplate
public void testBridgeClientID() throws Exception {
final String clientId = RandomUtil.randomString();
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
List<String> connectorConfig = List.of(server1tc.getName());
server0.getConfiguration().setBridgeConfigurations(List.of(new BridgeConfiguration().setName(getName()).setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(10).setReconnectAttempts(250).setStaticConnectors(connectorConfig).setClientId(clientId)));
server0.getConfiguration().setQueueConfigs(List.of(QueueConfiguration.of(queueName0).setAddress(testAddress)));
server1.getConfiguration().setQueueConfigs(List.of(QueueConfiguration.of(queueName1).setAddress(forwardAddress)));
server1.start();
server0.start();
Wait.assertTrue(()-> server0.getClusterManager().getBridges().get(getName()).isConnected(), 2000, 25);
String connectionsAsJsonString = server1.getActiveMQServerControl().listConnections(createJsonFilter(ConnectionField.CLIENT_ID.getName(), ActiveMQFilterPredicate.Operation.EQUALS.toString(), clientId), 1, 1);
JsonObject connectionsAsJsonObject = JsonUtil.readJsonObject(connectionsAsJsonString);
JsonArray array = (JsonArray) connectionsAsJsonObject.get("data");
assertEquals(1, array.size(), "number of connections returned from query");
}
/**
* @param server1Params
*/

View File

@ -16,20 +16,26 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.management.impl.view.ConnectionField;
import org.apache.activemq.artemis.core.management.impl.view.predicate.ActiveMQFilterPredicate;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ClusterConnectionConfigTest extends ClusterTestBase {
@Override
@ -103,4 +109,30 @@ public class ClusterConnectionConfigTest extends ClusterTestBase {
}
}
@Test
public void testClusterConnectionClientId() throws Exception {
final String clientId = RandomUtil.randomString();
setupCluster(MessageLoadBalancingType.ON_DEMAND, (ClusterConnectionConfiguration cfg) -> {
cfg.setClientId(clientId);
});
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
String connectionsAsJsonString = servers[1].getActiveMQServerControl().listConnections(createJsonFilter(ConnectionField.CLIENT_ID.getName(), ActiveMQFilterPredicate.Operation.EQUALS.toString(), clientId), 1, 1);
JsonObject connectionsAsJsonObject = JsonUtil.readJsonObject(connectionsAsJsonString);
JsonArray array = (JsonArray) connectionsAsJsonObject.get("data");
assertEquals(1, array.size(), "number of connections returned from query");
}
}

View File

@ -63,7 +63,8 @@ public class BridgeConfigurationStorageTest extends StorageManagerTestBase {
.setProducerWindowSize(123123)
.setConfirmationWindowSize(123123)
.setStaticConnectors(Arrays.asList("connector1", "connector2"))
.setTransformerConfiguration(mytransformer);
.setTransformerConfiguration(mytransformer)
.setClientId("myClientID");
journal.storeBridgeConfiguration(new PersistedBridgeConfiguration(configuration));
@ -87,6 +88,7 @@ public class BridgeConfigurationStorageTest extends StorageManagerTestBase {
assertEquals("prop1", properties.get("key1"));
assertEquals("prop2", properties.get("key2"));
assertEquals("prop3", properties.get("key3"));
assertEquals(configuration.getClientId(), persistedBridgeConfiguration.getBridgeConfiguration().getClientId());
}
@TestTemplate