ARTEMIS-5135 Use a sequence ID on address federation link names

Prevents unintended link stealing scenarios when link names match for a new
link whose name is in use by another link still awaiting a detach response.
This commit is contained in:
Timothy Bish 2024-10-28 15:22:07 -04:00 committed by Robbie Gemmell
parent 422446e033
commit a3aea7d9a0
3 changed files with 202 additions and 1 deletions

View File

@ -37,6 +37,7 @@ import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -100,6 +101,11 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
private static final SimpleString MESSAGE_HOPS_ANNOTATION =
SimpleString.of(AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION.toString());
// Sequence ID value used to keep links that would otherwise have the same name from overlapping
// this generally occurs when a remote link detach is delayed and new demand is added before it
// arrives resulting in an unintended link stealing scenario in the proton engine.
private static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong();
private static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
@ -255,7 +261,8 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
private String generateLinkName() {
return "federation-" + federation.getName() +
"-address-receiver-" + consumerInfo.getAddress() +
"-" + federation.getServer().getNodeID();
"-" + federation.getServer().getNodeID() +
"-" + LINK_SEQUENCE_ID.incrementAndGet();
}
private void asyncCreateReceiver() {

View File

@ -4185,6 +4185,104 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testNewFederationConsumerCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
.withSource().withDynamic(true)
.and()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind()
.withTarget().withAddress("test-dynamic-events");
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes(getTestName());
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
peer.expectDetach().respond().afterDelay(40); // Defer the detach response for a bit
server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), RoutingType.MULTICAST));
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
// Create demand on the address which creates a federation consumer then let it close which
// should shut down that federation consumer.
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName()));
connection.start();
consumer.receiveNoWait();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
peer.expectDetach().respond();
// Create demand on the address which creates a federation consumer again quickly which
// can trigger a new consumer before the previous one was fully closed with a Detach
// response and get stuck because it will steal the link in proton and not be treated
// as a new attach for this consumer.
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName()));
connection.start();
consumer.receiveNoWait();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);

View File

@ -3995,6 +3995,102 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testNewFederationConsumerCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
.withSource().withDynamic(true)
.and()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind()
.withTarget().withAddress("test-dynamic-events");
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes(getTestName(), getTestName());
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
peer.expectDetach().respond().afterDelay(40); // Defer the detach response for a bit
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
.setAddress(getTestName())
.setAutoCreated(false));
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
// Create demand on the queue which creates a federation consumer then let it close which
// should shut down that federation consumer.
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createQueue(getTestName()));
connection.start();
consumer.receiveNoWait();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
peer.expectDetach().respond();
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createQueue(getTestName()));
connection.start();
consumer.receiveNoWait();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);