ARTEMIS-4658 Prevent reflections when using dual address federation

When federation is configured in two directions between nodes for an address
the message can reflect from one node to another if max hops is not set or not
set correctly and in some federation topologies the max hops value can't solve
the issue and still result in a working configuration. This reflection should
be prevented at the federation consumer level for address consumers.
This commit is contained in:
Timothy Bish 2024-02-28 11:03:59 -05:00 committed by Robbie Gemmell
parent 723c11ac15
commit fd5d9b9ad0
7 changed files with 340 additions and 26 deletions

View File

@ -238,7 +238,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
ttl = 0;
}
String id = server.getConfiguration().getName();
String id = server.getNodeID().toString();
boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), true, saslFactory, null, outgoing);

View File

@ -62,11 +62,13 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@ -274,18 +276,20 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
source.setCapabilities(AmqpSupport.TOPIC_CAPABILITY);
}
final Map<Symbol, Object> filtersMap = new HashMap<>();
filtersMap.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
if (consumerInfo.getFilterString() != null && !consumerInfo.getFilterString().isEmpty()) {
final AmqpJmsSelectorFilter jmsFilter = new AmqpJmsSelectorFilter(consumerInfo.getFilterString());
filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
}
source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES, DEFAULT_OUTCOMES.length));
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
source.setAddress(address);
if (consumerInfo.getFilterString() != null && !consumerInfo.getFilterString().isEmpty()) {
final AmqpJmsSelectorFilter jmsFilter = new AmqpJmsSelectorFilter(consumerInfo.getFilterString());
final Map<Symbol, Object> filtersMap = new HashMap<>();
filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
source.setFilter(filtersMap);
}
source.setFilter(filtersMap);
target.setAddress(address);
@ -526,6 +530,11 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
if (message instanceof ICoreMessage) {
baseMessage = incrementCoreMessageHops((ICoreMessage) message);
// Add / Update the connection Id value to reflect the remote container Id so that the
// no-local filter of a federation address receiver directed back to the source of this
// message will exclude it as intended.
baseMessage.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING, getConnection().getRemoteContainer());
} else {
baseMessage = incrementAMQPMessageHops((AMQPMessage) message);
}

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
@ -117,19 +118,22 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB
final long autoDeleteDelay = ((Number) addressSourceProperties.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0)).longValue();
final long autoDeleteMsgCount = ((Number) addressSourceProperties.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0)).longValue();
// An address receiver may opt to filter on things like max message hops so we must
// check for a filter here and apply it if it exists.
final Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
// An address receiver may opt to filter on things like max message hops or no local message
// reflection so we must check for a filter here and apply it if it exists.
final String jmsSelector = getJMSSelectorFromFilters(source);
final Map.Entry<Symbol, DescribedType> noLocal = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
selector = filter.getValue().getDescribed().toString();
try {
SelectorParser.parse(selector);
} catch (FilterException e) {
throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
if (noLocal != null) {
String remoteContainerId = protonConnection.getRemoteContainer();
String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING + "<>'" + remoteContainerId + "'";
if (jmsSelector == null) {
selector = noLocalFilter;
} else {
selector = jmsSelector + " AND " + noLocalFilter;
}
} else {
selector = null;
selector = jmsSelector;
}
final SimpleString address = SimpleString.toSimpleString(source.getAddress());
@ -195,6 +199,26 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB
return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false);
}
@SuppressWarnings("unchecked")
private String getJMSSelectorFromFilters(Source source) throws ActiveMQAMQPException {
final Map.Entry<Symbol, DescribedType> jmsSelector = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
String selectorString = null;
// Validate the JMS selector if present.
if (jmsSelector != null) {
selectorString = jmsSelector.getValue().getDescribed().toString();
try {
SelectorParser.parse(selectorString);
} catch (FilterException e) {
throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
}
}
return selectorString;
}
private static RoutingType getRoutingType(Source source) {
if (source != null) {
if (source.getCapabilities() != null) {

View File

@ -105,17 +105,17 @@ public final class AMQPFederationPolicySupport {
* to indicate no max hops for federated messages on an address.
*
* @param maxHops
* The max allowed number of hops before a message should stop cross federation links.
* The max allowed number of hops before a message should stop crossing federation links.
*
* @return the address filter string or null if not needed.
* @return the address filter string that should be applied (or null).
*/
public static String generateAddressFilter(int maxHops) {
if (maxHops <= 0) {
return null;
}
return "(\"m." + MESSAGE_HOPS_ANNOTATION.toString() +
"\" IS NULL OR \"m." + MESSAGE_HOPS_ANNOTATION.toString() +
return "(\"m." + MESSAGE_HOPS_ANNOTATION +
"\" IS NULL OR \"m." + MESSAGE_HOPS_ANNOTATION +
"\"<" + maxHops + ")" +
" AND " +
"(" + MESSAGE_HOPS_PROPERTY + " IS NULL OR " +

View File

@ -75,6 +75,8 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testBrokerContainerId() throws Exception {
final String containerId = server.getNodeID().toString();
AmqpClient client = createAmqpClient();
assertNotNull(client);
@ -82,7 +84,7 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
@Override
public void inspectOpenedResource(Connection connection) {
if (!BROKER_NAME.equals(connection.getRemoteContainer())) {
if (!containerId.equals(connection.getRemoteContainer())) {
markAsInvalid("Broker did not send the expected container ID");
}
}

View File

@ -100,6 +100,8 @@ import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
@ -109,6 +111,10 @@ import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestPeer;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Source;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
import org.apache.qpid.protonj2.test.driver.codec.transport.Attach;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
@ -388,6 +394,15 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
@Test(timeout = 20000)
public void testFederationCreatesAddressReceiverLinkForAddressMatchWithMaxHopsFilter() throws Exception {
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(true);
}
@Test(timeout = 20000)
public void testFederationCreatesAddressReceiverLinkForAddressMatchWithoutMaxHopsFilter() throws Exception {
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(false);
}
private void doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(boolean maxHops) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
@ -410,7 +425,11 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.setMaxHops(1);
if (maxHops) {
receiveFromAddress.setMaxHops(1);
} else {
receiveFromAddress.setMaxHops(0); // Disabled
}
receiveFromAddress.addToIncludes("test");
receiveFromAddress.setAutoDelete(true);
receiveFromAddress.setAutoDeleteDelay(10_000L);
@ -430,12 +449,24 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
final String expectedJMSFilter = generateAddressFilter(1);
final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
final Symbol noLocalKey = Symbol.valueOf("apache.org:no-local-filter:list");
final org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong noLocalCode =
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000003L);
final org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong jmsSelectorCode =
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
final Map<String, Object> selectors = new HashMap<>();
selectors.put(AmqpSupport.JMS_SELECTOR_KEY.toString(), new AmqpJmsSelectorFilter(expectedJMSFilter));
selectors.put(AmqpSupport.NO_LOCAL_NAME.toString(), new AmqpNoLocalFilter());
final Map<String, Object> expectedSourceProperties = new HashMap<>();
expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
final AtomicReference<Attach> capturedAttach = new AtomicReference<>();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
@ -444,7 +475,7 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), expectedSourceProperties)
.withSource().withJMSSelector(expectedJMSFilter).and()
.withCapture(attach -> capturedAttach.set(attach))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
@ -472,6 +503,28 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNotNull(capturedAttach.get());
final Source remoteSource = capturedAttach.get().getSource();
assertNotNull(remoteSource);
final Map<Symbol, Object> filtersMap = remoteSource.getFilter();
assertNotNull(filtersMap);
if (maxHops) {
assertTrue(filtersMap.containsKey(jmsSelectorKey));
final DescribedType jmsSelectorEntry = (DescribedType) filtersMap.get(jmsSelectorKey);
assertNotNull(jmsSelectorEntry);
assertEquals(jmsSelectorEntry.getDescriptor(), jmsSelectorCode);
assertEquals(jmsSelectorEntry.getDescribed().toString(), expectedJMSFilter);
} else {
assertFalse(filtersMap.containsKey(jmsSelectorKey));
}
assertTrue(filtersMap.containsKey(noLocalKey));
final DescribedType noLocalEntry = (DescribedType) filtersMap.get(noLocalKey);
assertNotNull(noLocalEntry);
assertEquals(noLocalEntry.getDescriptor(), noLocalCode);
// Check that annotation for hops is present in the forwarded message.
final HeaderMatcher headerMatcher = new HeaderMatcher(true);
final MessageAnnotationsMatcher annotationsMatcher = new MessageAnnotationsMatcher(true);

View File

@ -1101,4 +1101,230 @@ public class AMQPFederationServerToServerTest extends AmqpClientTestSupport {
assertEquals("green", receivedR.getStringProperty("color"));
}
}
@Test(timeout = 20000)
public void testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeAMQP() throws Exception {
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("AMQP");
}
@Test(timeout = 20000)
public void testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeCore() throws Exception {
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("CORE");
}
private void doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode(String clientProtocol) throws Exception {
logger.info("Test started: {}", getTestName());
final AMQPFederationAddressPolicyElement localAddressPolicy = new AMQPFederationAddressPolicyElement();
localAddressPolicy.setName("local-test-policy");
localAddressPolicy.addToIncludes("test");
localAddressPolicy.setAutoDelete(false);
localAddressPolicy.setAutoDeleteDelay(-1L);
localAddressPolicy.setAutoDeleteMessageCount(-1L);
localAddressPolicy.setMaxHops(0); // Disable max hops
final AMQPFederationAddressPolicyElement remoteAddressPolicy = new AMQPFederationAddressPolicyElement();
remoteAddressPolicy.setName("remote-test-policy");
remoteAddressPolicy.addToIncludes("test");
remoteAddressPolicy.setAutoDelete(false);
remoteAddressPolicy.setAutoDeleteDelay(-1L);
remoteAddressPolicy.setAutoDeleteMessageCount(-1L);
remoteAddressPolicy.setMaxHops(0); // Disable max hops
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalAddressPolicy(localAddressPolicy);
element.addRemoteAddressPolicy(remoteAddressPolicy);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE);
amqpConnection.setReconnectAttempts(10);// Limit reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
remoteServer.start();
server.start();
final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT);
final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT_REMOTE);
try (Connection connectionL = factoryLocal.createConnection();
Connection connectionR = factoryRemote.createConnection()) {
final Session sessionL = connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
final Session sessionR = connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic topic = sessionL.createTopic("test");
final MessageConsumer consumerL = sessionL.createConsumer(topic);
final MessageConsumer consumerR = sessionR.createConsumer(topic);
final MessageProducer producerL = sessionL.createProducer(topic);
final MessageProducer producerR = sessionR.createProducer(topic);
final TextMessage messageFromL = sessionL.createTextMessage("local");
final TextMessage messageFromR = sessionR.createTextMessage("remote");
connectionL.start();
connectionR.start();
final SimpleString addressName = SimpleString.toSimpleString("test");
Wait.assertTrue(() -> server.addressQuery(addressName).isExists());
Wait.assertTrue(() -> remoteServer.addressQuery(addressName).isExists());
assertNull(consumerL.receiveNoWait());
assertNull(consumerR.receiveNoWait());
// Captures state of JMS consumer and federation consumer attached on each node
Wait.assertTrue(() -> server.bindingQuery(addressName, false).getQueueNames().size() >= 2);
Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, false).getQueueNames().size() >= 2);
producerL.send(messageFromL);
final Message messageL1 = consumerL.receive();
final Message messageR1 = consumerR.receive();
assertNotNull(messageL1);
assertNotNull(messageR1);
assertTrue(messageL1 instanceof TextMessage);
assertTrue(messageR1 instanceof TextMessage);
assertEquals("local", ((TextMessage) messageL1).getText());
assertEquals("local", ((TextMessage) messageR1).getText());
producerR.send(messageFromR);
final Message messageL2 = consumerL.receive();
final Message messageR2 = consumerR.receive();
assertNotNull(messageL2);
assertNotNull(messageR2);
assertTrue(messageL2 instanceof TextMessage);
assertTrue(messageR2 instanceof TextMessage);
assertEquals("remote", ((TextMessage) messageL2).getText());
assertEquals("remote", ((TextMessage) messageR2).getText());
// Should be no other messages routed
assertNull(consumerL.receiveNoWait());
assertNull(consumerR.receiveNoWait());
}
}
@Test(timeout = 20000)
public void testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeAMQP() throws Exception {
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("AMQP");
}
@Test(timeout = 20000)
public void testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeCore() throws Exception {
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("CORE");
}
private void doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode(String clientProtocol) throws Exception {
logger.info("Test started: {}", getTestName());
final AMQPFederationAddressPolicyElement localAddressPolicy1 = new AMQPFederationAddressPolicyElement();
localAddressPolicy1.setName("local-test-policy");
localAddressPolicy1.addToIncludes("test");
localAddressPolicy1.setAutoDelete(false);
localAddressPolicy1.setAutoDeleteDelay(-1L);
localAddressPolicy1.setAutoDeleteMessageCount(-1L);
localAddressPolicy1.setMaxHops(0); // Disable max hops
final AMQPFederationAddressPolicyElement localAddressPolicy2 = new AMQPFederationAddressPolicyElement();
localAddressPolicy2.setName("remote-test-policy");
localAddressPolicy2.addToIncludes("test");
localAddressPolicy2.setAutoDelete(false);
localAddressPolicy2.setAutoDeleteDelay(-1L);
localAddressPolicy2.setAutoDeleteMessageCount(-1L);
localAddressPolicy2.setMaxHops(0); // Disable max hops
final AMQPFederatedBrokerConnectionElement element1 = new AMQPFederatedBrokerConnectionElement();
element1.setName(getTestName() + ":1");
element1.addLocalAddressPolicy(localAddressPolicy1);
final AMQPFederatedBrokerConnectionElement element2 = new AMQPFederatedBrokerConnectionElement();
element2.setName(getTestName() + "2");
element2.addLocalAddressPolicy(localAddressPolicy2);
final AMQPBrokerConnectConfiguration amqpConnection1 =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE);
amqpConnection1.setReconnectAttempts(10);// Limit reconnects
amqpConnection1.addElement(element1);
final AMQPBrokerConnectConfiguration amqpConnection2 =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT);
amqpConnection2.setReconnectAttempts(10);// Limit reconnects
amqpConnection2.addElement(element2);
server.getConfiguration().addAMQPConnection(amqpConnection1);
remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
remoteServer.start();
server.start();
final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT);
final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT_REMOTE);
try (Connection connectionL = factoryLocal.createConnection();
Connection connectionR = factoryRemote.createConnection()) {
final Session sessionL = connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
final Session sessionR = connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic topic = sessionL.createTopic("test");
final MessageConsumer consumerL = sessionL.createConsumer(topic);
final MessageConsumer consumerR = sessionR.createConsumer(topic);
final MessageProducer producerL = sessionL.createProducer(topic);
final MessageProducer producerR = sessionR.createProducer(topic);
final TextMessage messageFromL = sessionL.createTextMessage("local");
final TextMessage messageFromR = sessionR.createTextMessage("remote");
connectionL.start();
connectionR.start();
final SimpleString addressName = SimpleString.toSimpleString("test");
Wait.assertTrue(() -> server.addressQuery(addressName).isExists());
Wait.assertTrue(() -> remoteServer.addressQuery(addressName).isExists());
assertNull(consumerL.receiveNoWait());
assertNull(consumerR.receiveNoWait());
// Captures state of JMS consumer and federation consumer attached on each node
Wait.assertTrue(() -> server.bindingQuery(addressName, false).getQueueNames().size() >= 2);
Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, false).getQueueNames().size() >= 2);
producerL.send(messageFromL);
final Message messageL1 = consumerL.receive();
final Message messageR1 = consumerR.receive();
assertNotNull(messageL1);
assertNotNull(messageR1);
assertTrue(messageL1 instanceof TextMessage);
assertTrue(messageR1 instanceof TextMessage);
assertEquals("local", ((TextMessage) messageL1).getText());
assertEquals("local", ((TextMessage) messageR1).getText());
producerR.send(messageFromR);
final Message messageL2 = consumerL.receive();
final Message messageR2 = consumerR.receive();
assertNotNull(messageL2);
assertNotNull(messageR2);
assertTrue(messageL2 instanceof TextMessage);
assertTrue(messageR2 instanceof TextMessage);
assertEquals("remote", ((TextMessage) messageL2).getText());
assertEquals("remote", ((TextMessage) messageR2).getText());
// Should be no other messages routed
assertNull(consumerL.receiveNoWait());
assertNull(consumerR.receiveNoWait());
}
}
}