diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index 6b3c6b45da..7617b95921 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -75,6 +75,8 @@ import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederati import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter; @@ -84,6 +86,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMes import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.protocol.amqp.proton.SenderController; import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; @@ -284,13 +287,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY}; connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null); connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability); - } else { - if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { - connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null); - } - if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { - connectReceiver(protonRemotingConnection, session, sessionContext, queue); - } + } else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { + connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null); + } else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { + connectReceiver(protonRemotingConnection, session, sessionContext, queue); } } @@ -644,34 +644,105 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, } protonRemotingConnection.getAmqpConnection().runLater(() -> { - - if (receivers.contains(queue)) { + if (!receivers.add(queue)) { logger.debug("Receiver for queue {} already exists, just giving up", queue); return; } - receivers.add(queue); - Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID()); - receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - Target target = new Target(); - target.setAddress(queue.getAddress().toString()); - receiver.setTarget(target); - Source source = new Source(); - source.setAddress(queue.getAddress().toString()); - receiver.setSource(source); - - if (capabilities != null) { - source.setCapabilities(capabilities); - } - - receiver.open(); - protonRemotingConnection.getAmqpConnection().flush(); try { - sessionContext.addReceiver(receiver); + final String linkName = queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID(); + final Receiver receiver = session.receiver(linkName); + final String queueAddress = queue.getAddress().toString(); + + final Target target = new Target(); + target.setAddress(queueAddress); + final Source source = new Source(); + source.setAddress(queueAddress); + if (capabilities != null) { + source.setCapabilities(capabilities); + } + + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + receiver.setTarget(target); + receiver.setSource(source); + receiver.open(); + + final ScheduledFuture openTimeoutTask; + final AtomicBoolean openTimedOut = new AtomicBoolean(false); + + if (getConnectionTimeout() > 0) { + openTimeoutTask = server.getScheduledPool().schedule(() -> { + openTimedOut.set(true); + error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter); + }, getConnectionTimeout(), TimeUnit.MILLISECONDS); + } else { + openTimeoutTask = null; + } + + // Await the remote attach before creating the broker receiver in order to impose a timeout + // on the attach response and then try and create the local server receiver context and finish + // the wiring. + receiver.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> { + try { + if (openTimeoutTask != null) { + openTimeoutTask.cancel(false); + } + + if (openTimedOut.get()) { + return; // Timed out before remote attach arrived + } + + if (receiver.getRemoteSource() != null) { + logger.trace("AMQP Broker Connection Receiver {} completed open", linkName); + } else { + logger.debug("AMQP Broker Connection Receiver {} rejected by remote", linkName); + error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.receiverLinkRefused(queueAddress), lastRetryCounter); + return; + } + + sessionContext.addReceiver(receiver, (r, s) -> { + // Returns a customized server receiver context that will respect the locally initiated state + // when the receiver is initialized vs the remotely sent target as we want to ensure we attach + // the receiver to the address we set in our local state. + return new ProtonServerReceiverContext(sessionContext.getSessionSPI(), + sessionContext.getAMQPConnectionContext(), + sessionContext, receiver) { + + @Override + public void initialize() throws Exception { + initialized = true; + address = SimpleString.of(target.getAddress()); + defRoutingType = getRoutingType(target.getCapabilities(), address); + + try { + // Check if the queue that triggered the attach still exists or has it been removed + // before the attach response arrived from the remote peer. + if (!sessionSPI.queueQuery(queue.getName(), queue.getRoutingType(), false).isExists()) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString()); + } + } catch (ActiveMQAMQPException e) { + receivers.remove(queue); + throw e; + } catch (Exception e) { + logger.debug(e.getMessage(), e); + receivers.remove(queue); + throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + } + + flow(); + } + }; + }); + } catch (Exception e) { + error(e); + } + }); } catch (Exception e) { error(e); } + + protonRemotingConnection.getAmqpConnection().flush(); }); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java index 64ca31ed82..68be6c3278 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java @@ -115,4 +115,6 @@ public interface ActiveMQAMQPProtocolMessageBundle { @Message(id = 119028, value = "Malformed Federation event message: {}") ActiveMQException malformedFederationEventMessage(String message); + @Message(id = 119029, value = "Receiver link refused for address {}") + ActiveMQAMQPIllegalStateException receiverLinkRefused(String address); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java new file mode 100644 index 0000000000..199cf48d06 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.qpid.proton.amqp.transport.LinkError; +import org.apache.qpid.protonj2.test.driver.ProtonTestServer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the Receiver functionality on AMQP broker connections + */ +@Timeout(20) +public class AMQPBrokerConnectionReceiverTest extends AmqpClientTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE"; + } + + @Override + protected ActiveMQServer createServer() throws Exception { + // Creates the broker used to make the outgoing connection. The port passed is for + // that brokers acceptor. The test server connected to by the broker binds to a random port. + return createServer(AMQP_PORT, false); + } + + @Test + public void testBrokerConnectionCreatesReceiverOnRemote() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respondInKind(); + peer.expectFlow(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + LOG.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement(); + element.setType(AMQPBrokerConnectionAddressType.RECEIVER); + element.setName(getTestName()); + element.setMatchAddress("test"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0); + amqpConnection.addElement(element); + amqpConnection.setAutostart(true); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + + peer.waitForScriptToComplete(); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(); + peer.close(); + } + } + + @Test + public void testIncomingMessageWithNoToFieldArrivesOnConfiguredAddress() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + LOG.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement(); + element.setType(AMQPBrokerConnectionAddressType.RECEIVER); + element.setName(getTestName()); + element.setMatchAddress("test"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0); + amqpConnection.addElement(element); + amqpConnection.setAutostart(true); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver().withSource().withAddress("test").and() + .withTarget().withAddress("test").and() + .respondInKind(); + peer.expectFlow(); + peer.remoteTransfer().withDeliveryId(1) + .withBody().withString("test-body").also() + .queue(); + peer.expectDisposition().withState().accepted(); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic("test"); + final MessageConsumer consumer = session.createConsumer(topic); + + connection.start(); + + final Message received = consumer.receive(5_000); + assertNotNull(received); + + consumer.close(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(); + peer.close(); + } + } + + @Test + public void testIncomingMessageWithToFieldArrivesOnConfiguredAddress() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + LOG.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement(); + element.setType(AMQPBrokerConnectionAddressType.RECEIVER); + element.setName(getTestName()); + element.setMatchAddress("test"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0); + amqpConnection.addElement(element); + amqpConnection.setAutostart(true); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver().withSource().withAddress("test").and() + .withTarget().withAddress("test").and() + .respondInKind(); + peer.expectFlow(); + peer.remoteTransfer().withDeliveryId(1) + .withProperties().withTo("should-not-be-used").also() + .withBody().withString("test-body").also() + .queue(); + peer.expectDisposition().withState().accepted(); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic("test"); + final MessageConsumer consumer = session.createConsumer(topic); + + connection.start(); + + final Message received = consumer.receive(5_000); + assertNotNull(received); + + consumer.close(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(); + peer.close(); + } + } + + @Test + public void testBrokerConnectionRetriesReceiverOnRemoteIfAttachRejected() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().reject(true, LinkError.DETACH_FORCED.toString(), "Attach refused"); + peer.expectDetach().optional(); + peer.expectClose().optional(); + peer.expectConnectionToDrop(); + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respondInKind(); + peer.expectFlow(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + LOG.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement(); + element.setType(AMQPBrokerConnectionAddressType.RECEIVER); + element.setName(getTestName()); + element.setMatchAddress("test"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(10); + amqpConnection.setRetryInterval(100); + amqpConnection.addElement(element); + amqpConnection.setAutostart(true); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + + peer.waitForScriptToComplete(); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(); + peer.close(); + } + } + + @Test + public void testBrokerConnectionRetriesReceiverOnRemoteIfTargetQueueRemovedAndLaterAddedBack() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.execute(() -> { + try { + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + } catch (Exception e) { + LOG.warn("Error on creating server address and queue: ", e); + } + }).queue(); + peer.expectAttach().ofReceiver(); + peer.execute(() -> { + try { + server.removeAddressInfo(SimpleString.of("test"), null, true); + } catch (Exception e) { + LOG.warn("Error on removing server address and queue: ", e); + } + peer.respondToLastAttach().now(); + }).queue(); + peer.expectDetach().respond(); + peer.execute(() -> { + try { + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + } catch (Exception e) { + LOG.warn("Error on creating server address and queue: ", e); + } + }).queue(); + peer.expectAttach().ofReceiver().respondInKind(); + peer.expectFlow(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + LOG.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement(); + element.setType(AMQPBrokerConnectionAddressType.RECEIVER); + element.setName(getTestName()); + element.setMatchAddress("test"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(10); + amqpConnection.setRetryInterval(50); + amqpConnection.addElement(element); + amqpConnection.setAutostart(true); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(); + peer.close(); + } + } +}