This commit is contained in:
Justin Bertram 2018-03-22 10:46:40 -05:00
commit e26c051617
3 changed files with 57 additions and 9 deletions

View File

@ -59,6 +59,8 @@ public class AMQPClientConnectionFactory {
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
delegate.addFailureListener(connectionCallback);
delegate.addCloseListener(connectionCallback);
connectionCallback.setProtonConnectionDelegate(delegate);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.client;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@ -63,7 +64,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
RemotingConnection connection = connectionMap.remove(connectionID);
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " destroyed");
connection.disconnect(false);
connection.fail(new ActiveMQRemoteDisconnectException());
} else {
log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
}
@ -93,7 +94,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
public void stop() {
for (RemotingConnection connection : connectionMap.values()) {
connection.disconnect(false);
connection.destroy();
}
}
@ -106,4 +107,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
}
}
public RemotingConnection getConnection(Object connectionId) {
return connectionMap.get(connectionId);
}
}

View File

@ -23,7 +23,11 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -34,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolMana
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
@ -45,16 +50,21 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testOutboundConnection() throws Throwable {
runOutboundConnectionTest(false);
runOutboundConnectionTest(false, true);
}
@Test(timeout = 60000)
public void testOutboundConnectionServerClose() throws Throwable {
runOutboundConnectionTest(false, false);
}
@Test(timeout = 60000)
public void testOutboundConnectionWithSecurity() throws Throwable {
runOutboundConnectionTest(true);
runOutboundConnectionTest(true, true);
}
private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
private void runOutboundConnectionTest(boolean withSecurity, boolean closeFromClient) throws Exception {
final ActiveMQServer remote;
try {
securityEnabled = withSecurity;
@ -92,17 +102,48 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
connector.start();
connector.createConnection();
Object connectionId = connector.createConnection().getID();
assertNotNull(connectionId);
RemotingConnection remotingConnection = lifeCycleListener.getConnection(connectionId);
AtomicReference<ActiveMQException> ex = new AtomicReference<>();
AtomicBoolean closed = new AtomicBoolean(false);
remotingConnection.addCloseListener(() -> closed.set(true));
remotingConnection.addFailureListener(new FailureListener() {
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
ex.set(exception);
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
ex.set(exception);
}
});
try {
Wait.assertEquals(1, remote::getConnectionCount);
Wait.assertTrue(connectionOpened::get);
if (closeFromClient) {
lifeCycleListener.stop();
} else {
remote.stop();
}
Wait.assertEquals(0, remote::getConnectionCount);
assertTrue(remotingConnection.isDestroyed());
if (!closeFromClient) {
assertTrue(ex.get() instanceof ActiveMQRemoteDisconnectException);
} else {
assertNull(ex.get());
}
} finally {
lifeCycleListener.stop();
if (closeFromClient) {
remote.stop();
} else {
lifeCycleListener.stop();
}
}
}