ARTEMIS-4754 Structure the names used for federation internal queues

When creating internal temporary queues for the federation control links and the
events link we should use a structured naming convention to ease in configuring
security for the federation user where all internal names fall under a root prefix
which can be used to grant read and write access for the federation user. This
change allows security on the wildcarded address "$ACTIVEMQ_ARTEMIS_FEDERATION.#".
This change also includes some further restrictions added to federation resources
and adds support for wildcarding '$' prefixed addresses.
This commit is contained in:
Timothy Bish 2024-05-01 16:39:23 -04:00 committed by Robbie Gemmell
parent 8b73335b46
commit d7a7116a4c
9 changed files with 411 additions and 34 deletions

View File

@ -263,15 +263,32 @@ public class AMQPSessionCallback implements SessionCallback {
}
public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception {
createTemporaryQueue(queueName, queueName, routingType, null);
createTemporaryQueue(queueName, queueName, routingType, null, null);
}
public void createTemporaryQueue(SimpleString queueName, RoutingType routingType, Integer maxConsumers) throws Exception {
createTemporaryQueue(queueName, queueName, routingType, null, maxConsumers);
}
public void createTemporaryQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType,
SimpleString filter) throws Exception {
createTemporaryQueue(address, queueName, routingType, filter, null);
}
public void createTemporaryQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType,
SimpleString filter,
Integer maxConsumers) throws Exception {
try {
serverSession.createQueue(new QueueConfiguration(queueName).setAddress(address).setRoutingType(routingType).setFilterString(filter).setTemporary(true).setDurable(false));
serverSession.createQueue(new QueueConfiguration(queueName).setAddress(address)
.setRoutingType(routingType)
.setFilterString(filter)
.setTemporary(true)
.setDurable(false)
.setMaxConsumers(maxConsumers));
} catch (ActiveMQSecurityException se) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage());
}

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
@ -170,6 +174,50 @@ public abstract class AMQPFederation implements FederationInternal {
}
}
/**
* Performs the prefixing for federation events queues that places the events queues into
* the name-space of federation related internal queues.
*
* @param suffix
* A suffix to append to the federation events link (normally the AMQP link name).
*
* @return the full internal queue name to use for the given suffix.
*/
String prefixEventsLinkQueueName(String suffix) {
final StringBuilder builder = new StringBuilder();
final char delimiter = getWildcardConfiguration().getDelimiter();
builder.append(FEDERATION_BASE_VALIDATION_ADDRESS)
.append(delimiter)
.append(FEDERATION_EVENTS_LINK_PREFIX)
.append(delimiter)
.append(suffix);
return builder.toString();
}
/**
* Performs the prefixing for federation control queue name that places the queues
* into the name-space of federation related internal queues.
*
* @param suffix
* A suffix to append to the federation control link (normally the AMQP link name).
*
* @return the full internal queue name to use for the given suffix.
*/
String prefixControlLinkQueueName(String suffix) {
final StringBuilder builder = new StringBuilder();
final char delimiter = getWildcardConfiguration().getDelimiter();
builder.append(FEDERATION_BASE_VALIDATION_ADDRESS)
.append(delimiter)
.append(FEDERATION_CONTROL_LINK_PREFIX)
.append(delimiter)
.append(suffix);
return builder.toString();
}
/**
* Adds a remote linked closed event interceptor that can intercept the closed event and
* if it returns true indicate that the close has been handled and that no further action

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -24,11 +26,13 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sender;
/**
@ -42,6 +46,8 @@ public class AMQPFederationCommandDispatcher implements SenderController {
private final AMQPSessionCallback session;
private final ActiveMQServer server;
private String controlAddress;
AMQPFederationCommandDispatcher(Sender sender, ActiveMQServer server, AMQPSessionCallback session) {
this.session = session;
this.sender = sender;
@ -105,33 +111,40 @@ public class AMQPFederationCommandDispatcher implements SenderController {
@Override
public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
final Connection protonConnection = senderContext.getSender().getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments = protonConnection.attachments();
final AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
if (federation == null) {
throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
}
// Get the dynamically generated name to use for local creation of a matching temporary
// queue that we will send control message to and the broker will dispatch as remote
// credit is made available.
final SimpleString queueName = SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
controlAddress = federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress());
try {
session.createTemporaryQueue(queueName, RoutingType.ANYCAST);
session.createTemporaryQueue(SimpleString.toSimpleString(getControlLinkAddress()), RoutingType.ANYCAST, 1);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
return (Consumer) session.createSender(senderContext, queueName, null, false);
return (Consumer) session.createSender(senderContext, SimpleString.toSimpleString(getControlLinkAddress()), null, false);
}
@Override
public void close() throws Exception {
// Make a best effort to remove the temporary queue used for control commands on close.
final SimpleString queueName = SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
try {
session.removeTemporaryQueue(queueName);
session.removeTemporaryQueue(SimpleString.toSimpleString(getControlLinkAddress()));
} catch (Exception e) {
// Ignored as the temporary queue should be removed on connection termination.
}
}
private String getControlLinkAddress() {
return sender.getRemoteTarget().getAddress();
return controlAddress;
}
}

View File

@ -30,10 +30,39 @@ public final class AMQPFederationConstants {
/**
* Address used by a remote broker instance to validate that an incoming federation connection
* has access right to perform federation operations. The user that connects to the AMQP federation
* endpoint and attempt to create the control link must have write access to this address.
* has access rights to perform federation operations. The user that connects to the AMQP federation
* endpoint and attempts to create the control link must have write access to this address and any
* address prefixed by this value.
*
* When securing a federation user account the user must have read and write permissions to addresses
* under this prefix using the broker defined delimiter, this include the ability to create non-durable
* resources.
*
* <pre>
* $ACTIVEMQ_ARTEMIS_FEDERATION.#;
* </pre>
*/
public static final String FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS = "$ACTIVEMQ_ARTEMIS_FEDERATION";
public static final String FEDERATION_BASE_VALIDATION_ADDRESS = "$ACTIVEMQ_ARTEMIS_FEDERATION";
/**
* The prefix value added when creating a federation control link beyond the initial portion of the
* validation address prefix. Links for command and control of federation operations follow the form:
*
* <pre>
* $ACTIVEMQ_ARTEMIS_FEDERATION.control.&lt;unique-id&gt;
* </pre>
*/
public static final String FEDERATION_CONTROL_LINK_PREFIX = "control";
/**
* The prefix value added when creating a federation events links beyond the initial portion of the
* validation address prefix. Links for federation events follow the form:
*
* <pre>
* $ACTIVEMQ_ARTEMIS_FEDERATION.events.&lt;unique-id&gt;
* </pre>
*/
public static final String FEDERATION_EVENTS_LINK_PREFIX = "events";
/**
* A desired capability added to the federation control link that must be offered

View File

@ -68,6 +68,8 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
private final Set<String> addressWatches = new HashSet<>();
private final Set<String> queueWatches = new HashSet<>();
private String eventsAddress;
public AMQPFederationEventDispatcher(AMQPFederation federation, AMQPSessionCallback session, Sender sender) {
this.session = session;
this.sender = sender;
@ -76,7 +78,7 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
}
private String getEventsLinkAddress() {
return sender.getName();
return eventsAddress;
}
/**
@ -100,8 +102,7 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
final Connection protonConnection = senderContext.getSender().getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments = protonConnection.attachments();
AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
final AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
if (federation == null) {
throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
@ -115,7 +116,7 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
// Create a temporary queue using the unique link name which is where events will
// be sent to so that they can be held until credit is granted by the remote.
final SimpleString queueName = SimpleString.toSimpleString(sender.getName());
eventsAddress = federation.prefixEventsLinkQueueName(sender.getName());
if (sender.getLocalState() != EndpointState.ACTIVE) {
// Indicate that event link capabilities is supported.
@ -131,11 +132,11 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
throw new ActiveMQAMQPInternalErrorException("Remote Terminus did not arrive as dynamic node: " + remoteTerminus);
}
remoteTerminus.setAddress(queueName.toString());
remoteTerminus.setAddress(getEventsLinkAddress());
}
try {
session.createTemporaryQueue(queueName, RoutingType.ANYCAST);
session.createTemporaryQueue(SimpleString.toSimpleString(getEventsLinkAddress()), RoutingType.ANYCAST, 1);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
@ -145,18 +146,16 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
server.registerBrokerPlugin(this); // Start listening for bindings and consumer events.
return (Consumer) session.createSender(senderContext, queueName, null, false);
return (Consumer) session.createSender(senderContext, SimpleString.toSimpleString(getEventsLinkAddress()), null, false);
}
@Override
public void close() {
// Make a best effort to remove the temporary queue used for event messages on close.
final SimpleString queueName = SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
server.unRegisterBrokerPlugin(this);
try {
session.removeTemporaryQueue(queueName);
session.removeTemporaryQueue(SimpleString.toSimpleString(getEventsLinkAddress()));
} catch (Exception e) {
// Ignored as the temporary queue should be removed on connection termination.
}

View File

@ -79,7 +79,7 @@ import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
@ -472,7 +472,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
private void handleFederationControlLinkOpened(AMQPSessionContext protonSession, Receiver receiver) throws Exception {
try {
try {
protonSession.getSessionSPI().check(SimpleString.toSimpleString(FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS), CheckType.SEND, getSecurityAuth());
protonSession.getSessionSPI().check(SimpleString.toSimpleString(FEDERATION_BASE_VALIDATION_ADDRESS), CheckType.SEND, getSecurityAuth());
} catch (ActiveMQSecurityException e) {
throw new ActiveMQAMQPSecurityException(
"User does not have permission to attach to the federation control address");

View File

@ -36,6 +36,10 @@ public class Match<T> {
private static final String DOT_REPLACEMENT = "\\.";
private static final String DOLLAR = "$";
private static final String DOLLAR_REPLACEMENT = "\\$";
private final String match;
private final Pattern pattern;
@ -75,6 +79,7 @@ public class Match<T> {
actMatch = actMatch.replace(wildcardConfiguration.getDelimiterString() + wildcardConfiguration.getAnyWordsString(), wildcardConfiguration.getAnyWordsString());
}
actMatch = actMatch.replace(Match.DOT, Match.DOT_REPLACEMENT);
actMatch = actMatch.replace(Match.DOLLAR, Match.DOLLAR_REPLACEMENT);
actMatch = actMatch.replace(wildcardConfiguration.getSingleWordString(), String.format(WORD_WILDCARD_REPLACEMENT_FORMAT, Pattern.quote(wildcardConfiguration.getDelimiterString())));
if (direct) {

View File

@ -78,6 +78,29 @@ public class MatchTest {
Assert.assertFalse(predicate.test("testing.A"));
Assert.assertFalse(predicate.test("test"));
Assert.assertFalse(predicate.test("test.A.B"));
}
@Test
public void testDollarMatchingDirectTrue() {
final Pattern pattern = Match.createPattern("$test.#", new WildcardConfiguration(), true);
final Predicate<String> predicate = pattern.asPredicate();
Assert.assertTrue(predicate.test("$test.A"));
Assert.assertTrue(predicate.test("$test.A.B"));
Assert.assertFalse(predicate.test("$testing.A"));
Assert.assertFalse(predicate.test("$test"));
}
@Test
public void testDollarMatchingDirectFalse() {
final Pattern pattern = Match.createPattern("$test.#", new WildcardConfiguration(), false);
final Predicate<String> predicate = pattern.asPredicate();
Assert.assertTrue(predicate.test("$test"));
Assert.assertTrue(predicate.test("$test.A"));
Assert.assertTrue(predicate.test("$test.A.B"));
Assert.assertFalse(predicate.test("$testing.A"));
}
}

View File

@ -28,7 +28,9 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
@ -62,10 +64,9 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
@ -158,21 +159,23 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
federationConfiguration.put(IGNORE_QUEUE_CONSUMER_PRIORITIES, AMQP_INGNORE_CONSUMER_PRIORITIES);
federationConfiguration.put(AmqpSupport.TUNNEL_CORE_MESSAGES, AMQP_TUNNEL_CORE_MESSAGES);
final String controlLinkAddress = "test-control-address";
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString())
.withName(allOf(containsString("Federation"), containsString("myFederation")))
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.withName(allOf(containsString("federation-"), containsString("myFederation")))
.withProperty(FEDERATION_CONFIGURATION.toString(), federationConfiguration)
.withTarget().withDynamic(true)
.withCapabilities("temporary-topic")
.and()
.respond()
.withTarget().withAddress("test-control-address")
.withTarget().withAddress(controlLinkAddress)
.and()
.withOfferedCapabilities(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.start();
final URI remoteURI = peer.getServerURI();
@ -193,7 +196,9 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Wait.assertTrue(() -> server.locateQueue("test-control-address") != null);
Wait.assertTrue(() -> server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
"." + FEDERATION_CONTROL_LINK_PREFIX +
"." + controlLinkAddress) != null);
}
}
@ -203,7 +208,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond();
peer.expectAttach().ofSender().withDesiredCapability(FEDERATION_CONTROL_LINK.toString()).respond();
peer.expectConnectionToDrop();
peer.start();
@ -645,7 +650,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
@Test(timeout = 20000)
public void testControlLinkPassesConnectAttemptWhenUserHasPrivledges() throws Exception {
enableSecurity(server, FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS);
enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS);
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
@ -665,9 +670,31 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 20000)
public void testControlAndEventsLinksPassesConnectAttemptWhenUserHasPrivledges() throws Exception {
enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS + ".#");
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, getTestName(), true, fullUser, fullPass, true, true);
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
logger.info("Test stopped");
}
}
@Test(timeout = 20000)
public void testControlLinkRefusesConnectAttemptWhenUseDoesNotHavePrivledgesForControlAddress() throws Exception {
enableSecurity(server, FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS);
enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS);
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
@ -773,6 +800,222 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 20000)
public void testControlLinkSenderQueueCreatedWithMaxConsumersOfOne() throws Exception {
final String controlLinkAddress = "test-control-address";
final String federationControlSenderAddress = FEDERATION_BASE_VALIDATION_ADDRESS +
"." + FEDERATION_CONTROL_LINK_PREFIX +
"." + controlLinkAddress;
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.withName(allOf(containsString("federation-"), containsString("myFederation")))
.withTarget().withDynamic(true)
.withCapabilities("temporary-topic")
.and()
.respond()
.withTarget().withAddress(controlLinkAddress)
.and()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(
getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
final AMQPFederatedBrokerConnectionElement federation = new AMQPFederatedBrokerConnectionElement("myFederation");
amqpConnection.addElement(federation);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Wait.assertTrue(() -> server.locateQueue(federationControlSenderAddress) != null);
// Try and bind to the control address which should be rejected as the queue
// was created with max consumers of one.
peer.expectAttach().ofSender()
.withName("test-control-link-suspect")
.withNullSource();
peer.expectDetach().withClosed(true)
.withError(AmqpError.INTERNAL_ERROR.toString());
peer.remoteAttach().ofReceiver()
.withName("test-control-link-suspect")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withTarget().also()
.withSource().withAddress(federationControlSenderAddress)
.also()
.now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test(timeout = 20000)
public void testEventSenderLinkFromTargetUsesNamespacedDynamicQueue() throws Exception {
final String federationControlLinkName = "federation-test";
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
peer.queueClientSaslAnonymousConnect();
peer.remoteOpen().queue();
peer.expectOpen();
peer.remoteBegin().queue();
peer.expectBegin();
peer.remoteAttach().ofSender()
.withName(federationControlLinkName)
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withSource().also()
.withTarget().withDynamic(true)
.withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withLifetimePolicyOfDeleteOnClose()
.withCapabilities("temporary-topic")
.also()
.queue();
peer.expectAttach().ofReceiver()
.withName(federationControlLinkName)
.withTarget()
.withAddress(notNullValue())
.also()
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
peer.expectFlow();
final String federationEventsSenderLinkName = "events-receiver-test";
peer.remoteAttach().ofReceiver()
.withName(federationEventsSenderLinkName)
.withDesiredCapabilities(FEDERATION_EVENT_LINK.toString())
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withTarget().also()
.withSource().withDynamic(true)
.withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withLifetimePolicyOfDeleteOnClose()
.withCapabilities("temporary-topic")
.also()
.queue();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().ofSender()
.withName(federationEventsSenderLinkName)
.withSource()
.withAddress(notNullValue())
.also()
.withOfferedCapability(FEDERATION_EVENT_LINK.toString());
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// The events receiver from the remote should trigger a temporary queue to be created on
// the server to allow sends of events beyond currently available credit.
Wait.assertTrue(() -> server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
"." + FEDERATION_EVENTS_LINK_PREFIX +
"." + federationEventsSenderLinkName) != null);
server.stop();
}
}
@Test(timeout = 20000)
public void testEventsLinkAtTargetIsCreatedWithMaxConsumersOfOne() throws Exception {
final String federationControlLinkName = "federation-test";
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
peer.queueClientSaslAnonymousConnect();
peer.remoteOpen().queue();
peer.expectOpen();
peer.remoteBegin().queue();
peer.expectBegin();
peer.remoteAttach().ofSender()
.withName(federationControlLinkName)
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withSource().also()
.withTarget().withDynamic(true)
.withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withLifetimePolicyOfDeleteOnClose()
.withCapabilities("temporary-topic")
.also()
.queue();
peer.expectAttach().ofReceiver()
.withName(federationControlLinkName)
.withTarget()
.withAddress(notNullValue())
.also()
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
peer.expectFlow();
final String federationEventsSenderLinkName = "events-receiver-test";
final String federationEventsSenderAddress = FEDERATION_BASE_VALIDATION_ADDRESS +
"." + FEDERATION_EVENTS_LINK_PREFIX +
"." + federationEventsSenderLinkName;
peer.remoteAttach().ofReceiver()
.withName(federationEventsSenderLinkName)
.withDesiredCapabilities(FEDERATION_EVENT_LINK.toString())
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withTarget().also()
.withSource().withDynamic(true)
.withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withLifetimePolicyOfDeleteOnClose()
.withCapabilities("temporary-topic")
.also()
.queue();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().ofSender()
.withName(federationEventsSenderLinkName)
.withSource()
.withAddress(notNullValue())
.also()
.withOfferedCapability(FEDERATION_EVENT_LINK.toString());
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// The events receiver from the remote should trigger a temporary queue to be created on
// the server to allow sends of events beyond currently available credit.
Wait.assertTrue(() -> server.locateQueue(federationEventsSenderAddress) != null);
// Try and bind to the events address which should be rejected as the queue
// was created with max consumers of one.
peer.expectAttach().ofSender()
.withName("test-events-link-suspect")
.withNullSource();
peer.expectDetach().withClosed(true)
.withError(AmqpError.INTERNAL_ERROR.toString());
peer.remoteAttach().ofReceiver()
.withName("test-events-link-suspect")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withTarget().also()
.withSource().withAddress(federationEventsSenderAddress)
.also()
.now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
server.stop();
}
}
// Use these methods to script the initial handshake that a broker that is establishing
// a federation connection with a remote broker instance would perform.