This commit is contained in:
Clebert Suconic 2018-09-21 09:49:53 -04:00
commit cf525f014b
2 changed files with 70 additions and 3 deletions

View File

@ -47,6 +47,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
@ -604,6 +605,7 @@ public class NettyConnector extends AbstractConnector {
protocolManager.addChannelHandlers(pipeline); protocolManager.addChannelHandlers(pipeline);
pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor)); pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id());
} }
}); });
@ -737,6 +739,20 @@ public class NettyConnector extends AbstractConnector {
@Override @Override
public Connection createConnection() { public Connection createConnection() {
return createConnection(null);
}
/**
* Create and return a connection from this connector.
* <p>
* This method must NOT throw an exception if it fails to create the connection
* (e.g. network is not available), in this case it MUST return null.<br>
* This version can be used for testing purposes.
*
* @param onConnect a callback that would be called right after {@link Bootstrap#connect()}
* @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable)
*/
public final Connection createConnection(Consumer<ChannelFuture> onConnect) {
if (channelClazz == null) { if (channelClazz == null) {
return null; return null;
} }
@ -758,7 +774,9 @@ public class NettyConnector extends AbstractConnector {
} else { } else {
future = bootstrap.connect(remoteDestination); future = bootstrap.connect(remoteDestination);
} }
if (onConnect != null) {
onConnect.accept(future);
}
future.awaitUninterruptibly(); future.awaitUninterruptibly();
if (future.isSuccess()) { if (future.isSuccess()) {
@ -770,7 +788,15 @@ public class NettyConnector extends AbstractConnector {
if (handshakeFuture.isSuccess()) { if (handshakeFuture.isSuccess()) {
ChannelPipeline channelPipeline = ch.pipeline(); ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class); ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
if (channelHandler != null) {
channelHandler.active = true; channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
} else { } else {
ch.close().awaitUninterruptibly(); ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause()); ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
@ -830,7 +856,15 @@ public class NettyConnector extends AbstractConnector {
} else { } else {
ChannelPipeline channelPipeline = ch.pipeline(); ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class); ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
if (channelHandler != null) {
channelHandler.active = true; channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
} }
// No acceptor on a client connection // No acceptor on a client connection

View File

@ -20,11 +20,14 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -361,4 +364,34 @@ public class NettyConnectorTest extends ActiveMQTestBase {
connector.close(); connector.close();
Assert.assertFalse(connector.isStarted()); Assert.assertFalse(connector.isStarted());
} }
@Test
public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception {
BufferHandler handler = (connectionID, buffer) -> {
};
Map<String, Object> params = new HashMap<>();
final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
try {
NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
connector.start();
final Connection connection = connector.createConnection(future -> {
future.awaitUninterruptibly();
Assert.assertTrue(future.isSuccess());
final ChannelPipeline pipeline = future.channel().pipeline();
final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class);
Assert.assertNotNull(activeMQChannelHandler);
pipeline.remove(activeMQChannelHandler);
Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class));
});
Assert.assertNull(connection);
connector.close();
} finally {
closeExecutor.shutdownNow();
threadPool.shutdownNow();
scheduledThreadPool.shutdownNow();
}
}
} }