ARTEMIS-4799 Fix broker connection receiver attach handling

The receiver attach in broker connection does not wait for the remote
attach to arrive before creating the broker side receiver plumbing which
leads to the broker treating the remote sender as an anonymous relay when
it is not and should not be. Await the remote attach in response to the
attach sent by the broker connection to finish the link setup but use the
locally defined target address vs the remote to route the incoming messages.
This commit is contained in:
Timothy Bish 2024-06-06 14:53:02 -04:00 committed by Robbie Gemmell
parent 4fde81813c
commit fad1f5274c
3 changed files with 432 additions and 27 deletions

View File

@ -75,6 +75,8 @@ import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederati
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
@ -84,6 +86,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMes
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
@ -284,13 +287,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null);
connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability);
} else {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
}
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
}
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
}
}
@ -644,34 +644,105 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
protonRemotingConnection.getAmqpConnection().runLater(() -> {
if (receivers.contains(queue)) {
if (!receivers.add(queue)) {
logger.debug("Receiver for queue {} already exists, just giving up", queue);
return;
}
receivers.add(queue);
Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID());
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
Target target = new Target();
target.setAddress(queue.getAddress().toString());
receiver.setTarget(target);
Source source = new Source();
source.setAddress(queue.getAddress().toString());
receiver.setSource(source);
if (capabilities != null) {
source.setCapabilities(capabilities);
}
receiver.open();
protonRemotingConnection.getAmqpConnection().flush();
try {
sessionContext.addReceiver(receiver);
final String linkName = queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID();
final Receiver receiver = session.receiver(linkName);
final String queueAddress = queue.getAddress().toString();
final Target target = new Target();
target.setAddress(queueAddress);
final Source source = new Source();
source.setAddress(queueAddress);
if (capabilities != null) {
source.setCapabilities(capabilities);
}
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
receiver.setTarget(target);
receiver.setSource(source);
receiver.open();
final ScheduledFuture<?> openTimeoutTask;
final AtomicBoolean openTimedOut = new AtomicBoolean(false);
if (getConnectionTimeout() > 0) {
openTimeoutTask = server.getScheduledPool().schedule(() -> {
openTimedOut.set(true);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
}, getConnectionTimeout(), TimeUnit.MILLISECONDS);
} else {
openTimeoutTask = null;
}
// Await the remote attach before creating the broker receiver in order to impose a timeout
// on the attach response and then try and create the local server receiver context and finish
// the wiring.
receiver.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
try {
if (openTimeoutTask != null) {
openTimeoutTask.cancel(false);
}
if (openTimedOut.get()) {
return; // Timed out before remote attach arrived
}
if (receiver.getRemoteSource() != null) {
logger.trace("AMQP Broker Connection Receiver {} completed open", linkName);
} else {
logger.debug("AMQP Broker Connection Receiver {} rejected by remote", linkName);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.receiverLinkRefused(queueAddress), lastRetryCounter);
return;
}
sessionContext.addReceiver(receiver, (r, s) -> {
// Returns a customized server receiver context that will respect the locally initiated state
// when the receiver is initialized vs the remotely sent target as we want to ensure we attach
// the receiver to the address we set in our local state.
return new ProtonServerReceiverContext(sessionContext.getSessionSPI(),
sessionContext.getAMQPConnectionContext(),
sessionContext, receiver) {
@Override
public void initialize() throws Exception {
initialized = true;
address = SimpleString.of(target.getAddress());
defRoutingType = getRoutingType(target.getCapabilities(), address);
try {
// Check if the queue that triggered the attach still exists or has it been removed
// before the attach response arrived from the remote peer.
if (!sessionSPI.queueQuery(queue.getName(), queue.getRoutingType(), false).isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
}
} catch (ActiveMQAMQPException e) {
receivers.remove(queue);
throw e;
} catch (Exception e) {
logger.debug(e.getMessage(), e);
receivers.remove(queue);
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
flow();
}
};
});
} catch (Exception e) {
error(e);
}
});
} catch (Exception e) {
error(e);
}
protonRemotingConnection.getAmqpConnection().flush();
});
}

View File

@ -115,4 +115,6 @@ public interface ActiveMQAMQPProtocolMessageBundle {
@Message(id = 119028, value = "Malformed Federation event message: {}")
ActiveMQException malformedFederationEventMessage(String message);
@Message(id = 119029, value = "Receiver link refused for address {}")
ActiveMQAMQPIllegalStateException receiverLinkRefused(String address);
}

View File

@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the Receiver functionality on AMQP broker connections
*/
@Timeout(20)
public class AMQPBrokerConnectionReceiverTest extends AmqpClientTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
@Override
protected ActiveMQServer createServer() throws Exception {
// Creates the broker used to make the outgoing connection. The port passed is for
// that brokers acceptor. The test server connected to by the broker binds to a random port.
return createServer(AMQP_PORT, false);
}
@Test
public void testBrokerConnectionCreatesReceiverOnRemote() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respondInKind();
peer.expectFlow();
peer.start();
final URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
element.setName(getTestName());
element.setMatchAddress("test");
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);
amqpConnection.addElement(element);
amqpConnection.setAutostart(true);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
peer.waitForScriptToComplete();
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete();
peer.close();
}
}
@Test
public void testIncomingMessageWithNoToFieldArrivesOnConfiguredAddress() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.start();
final URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
element.setName(getTestName());
element.setMatchAddress("test");
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);
amqpConnection.addElement(element);
amqpConnection.setAutostart(true);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver().withSource().withAddress("test").and()
.withTarget().withAddress("test").and()
.respondInKind();
peer.expectFlow();
peer.remoteTransfer().withDeliveryId(1)
.withBody().withString("test-body").also()
.queue();
peer.expectDisposition().withState().accepted();
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageConsumer consumer = session.createConsumer(topic);
connection.start();
final Message received = consumer.receive(5_000);
assertNotNull(received);
consumer.close();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete();
peer.close();
}
}
@Test
public void testIncomingMessageWithToFieldArrivesOnConfiguredAddress() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.start();
final URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
element.setName(getTestName());
element.setMatchAddress("test");
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);
amqpConnection.addElement(element);
amqpConnection.setAutostart(true);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver().withSource().withAddress("test").and()
.withTarget().withAddress("test").and()
.respondInKind();
peer.expectFlow();
peer.remoteTransfer().withDeliveryId(1)
.withProperties().withTo("should-not-be-used").also()
.withBody().withString("test-body").also()
.queue();
peer.expectDisposition().withState().accepted();
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageConsumer consumer = session.createConsumer(topic);
connection.start();
final Message received = consumer.receive(5_000);
assertNotNull(received);
consumer.close();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete();
peer.close();
}
}
@Test
public void testBrokerConnectionRetriesReceiverOnRemoteIfAttachRejected() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().reject(true, LinkError.DETACH_FORCED.toString(), "Attach refused");
peer.expectDetach().optional();
peer.expectClose().optional();
peer.expectConnectionToDrop();
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respondInKind();
peer.expectFlow();
peer.start();
final URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
element.setName(getTestName());
element.setMatchAddress("test");
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(10);
amqpConnection.setRetryInterval(100);
amqpConnection.addElement(element);
amqpConnection.setAutostart(true);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
peer.waitForScriptToComplete();
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete();
peer.close();
}
}
@Test
public void testBrokerConnectionRetriesReceiverOnRemoteIfTargetQueueRemovedAndLaterAddedBack() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.execute(() -> {
try {
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
} catch (Exception e) {
LOG.warn("Error on creating server address and queue: ", e);
}
}).queue();
peer.expectAttach().ofReceiver();
peer.execute(() -> {
try {
server.removeAddressInfo(SimpleString.of("test"), null, true);
} catch (Exception e) {
LOG.warn("Error on removing server address and queue: ", e);
}
peer.respondToLastAttach().now();
}).queue();
peer.expectDetach().respond();
peer.execute(() -> {
try {
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
} catch (Exception e) {
LOG.warn("Error on creating server address and queue: ", e);
}
}).queue();
peer.expectAttach().ofReceiver().respondInKind();
peer.expectFlow();
peer.start();
final URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
element.setName(getTestName());
element.setMatchAddress("test");
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(10);
amqpConnection.setRetryInterval(50);
amqpConnection.addElement(element);
amqpConnection.setAutostart(true);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete();
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete();
peer.close();
}
}
}