diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java index c633db8be5..af65937eec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java @@ -59,6 +59,8 @@ public class AMQPClientConnectionFactory { eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); + delegate.addFailureListener(connectionCallback); + delegate.addCloseListener(connectionCallback); connectionCallback.setProtonConnectionDelegate(delegate); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java index df0de77eab..f19aace0bb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.client; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; @@ -63,7 +64,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis RemotingConnection connection = connectionMap.remove(connectionID); if (connection != null) { log.info("Connection " + connection.getRemoteAddress() + " destroyed"); - connection.disconnect(false); + connection.fail(new ActiveMQRemoteDisconnectException()); } else { log.error("Connection with id " + connectionID + " not found in connectionDestroyed"); } @@ -93,7 +94,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis public void stop() { for (RemotingConnection connection : connectionMap.values()) { - connection.disconnect(false); + connection.destroy(); } } @@ -106,4 +107,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis log.error("Connection with id " + connectionID + " not found in bufferReceived()!"); } } + + public RemotingConnection getConnection(Object connectionId) { + return connectionMap.get(connectionId); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java index 8f98715176..1156e183c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java @@ -23,7 +23,11 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; +import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -34,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolMana import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Connection; @@ -45,16 +50,21 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testOutboundConnection() throws Throwable { - runOutboundConnectionTest(false); + runOutboundConnectionTest(false, true); + } + + @Test(timeout = 60000) + public void testOutboundConnectionServerClose() throws Throwable { + runOutboundConnectionTest(false, false); } @Test(timeout = 60000) public void testOutboundConnectionWithSecurity() throws Throwable { - runOutboundConnectionTest(true); + runOutboundConnectionTest(true, true); } - private void runOutboundConnectionTest(boolean withSecurity) throws Exception { + private void runOutboundConnectionTest(boolean withSecurity, boolean closeFromClient) throws Exception { final ActiveMQServer remote; try { securityEnabled = withSecurity; @@ -92,17 +102,48 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server); NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager); connector.start(); - connector.createConnection(); + + Object connectionId = connector.createConnection().getID(); + assertNotNull(connectionId); + RemotingConnection remotingConnection = lifeCycleListener.getConnection(connectionId); + + AtomicReference ex = new AtomicReference<>(); + AtomicBoolean closed = new AtomicBoolean(false); + remotingConnection.addCloseListener(() -> closed.set(true)); + remotingConnection.addFailureListener(new FailureListener() { + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + ex.set(exception); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + ex.set(exception); + } + }); try { Wait.assertEquals(1, remote::getConnectionCount); Wait.assertTrue(connectionOpened::get); - lifeCycleListener.stop(); + if (closeFromClient) { + lifeCycleListener.stop(); + } else { + remote.stop(); + } Wait.assertEquals(0, remote::getConnectionCount); + assertTrue(remotingConnection.isDestroyed()); + if (!closeFromClient) { + assertTrue(ex.get() instanceof ActiveMQRemoteDisconnectException); + } else { + assertNull(ex.get()); + } } finally { - lifeCycleListener.stop(); - remote.stop(); + if (closeFromClient) { + remote.stop(); + } else { + lifeCycleListener.stop(); + } } }