diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 4a87770817..32deb385bb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -47,6 +47,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Stream; import io.netty.bootstrap.Bootstrap; @@ -604,6 +605,7 @@ public class NettyConnector extends AbstractConnector { protocolManager.addChannelHandlers(pipeline); pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor)); + logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id()); } }); @@ -737,6 +739,20 @@ public class NettyConnector extends AbstractConnector { @Override public Connection createConnection() { + return createConnection(null); + } + + /** + * Create and return a connection from this connector. + *

+ * This method must NOT throw an exception if it fails to create the connection + * (e.g. network is not available), in this case it MUST return null.
+ * This version can be used for testing purposes. + * + * @param onConnect a callback that would be called right after {@link Bootstrap#connect()} + * @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable) + */ + public final Connection createConnection(Consumer onConnect) { if (channelClazz == null) { return null; } @@ -758,7 +774,9 @@ public class NettyConnector extends AbstractConnector { } else { future = bootstrap.connect(remoteDestination); } - + if (onConnect != null) { + onConnect.accept(future); + } future.awaitUninterruptibly(); if (future.isSuccess()) { @@ -770,7 +788,15 @@ public class NettyConnector extends AbstractConnector { if (handshakeFuture.isSuccess()) { ChannelPipeline channelPipeline = ch.pipeline(); ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; + if (channelHandler != null) { + channelHandler.active = true; + } else { + ch.close().awaitUninterruptibly(); + ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection( + new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " + + remoteDestination + " from Channel with id = " + ch.id())); + return null; + } } else { ch.close().awaitUninterruptibly(); ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause()); @@ -830,7 +856,15 @@ public class NettyConnector extends AbstractConnector { } else { ChannelPipeline channelPipeline = ch.pipeline(); ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; + if (channelHandler != null) { + channelHandler.active = true; + } else { + ch.close().awaitUninterruptibly(); + ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection( + new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " + + remoteDestination + " from Channel with id = " + ch.id())); + return null; + } } // No acceptor on a client connection diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java index 98293299e1..ffd2cd4cac 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java @@ -20,11 +20,14 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -361,4 +364,34 @@ public class NettyConnectorTest extends ActiveMQTestBase { connector.close(); Assert.assertFalse(connector.isStarted()); } + + @Test + public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception { + BufferHandler handler = (connectionID, buffer) -> { + }; + Map params = new HashMap<>(); + final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()); + try { + NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool); + connector.start(); + final Connection connection = connector.createConnection(future -> { + future.awaitUninterruptibly(); + Assert.assertTrue(future.isSuccess()); + final ChannelPipeline pipeline = future.channel().pipeline(); + final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class); + Assert.assertNotNull(activeMQChannelHandler); + pipeline.remove(activeMQChannelHandler); + Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class)); + }); + Assert.assertNull(connection); + connector.close(); + } finally { + closeExecutor.shutdownNow(); + threadPool.shutdownNow(); + scheduledThreadPool.shutdownNow(); + } + } + }