diff --git a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/HandshakeWaitingHandler.java b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/Netty3HandshakeWaitingHandler.java similarity index 93% rename from elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/HandshakeWaitingHandler.java rename to elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/Netty3HandshakeWaitingHandler.java index 5af88b8c6f6..bd4bd2cb356 100644 --- a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/HandshakeWaitingHandler.java +++ b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/Netty3HandshakeWaitingHandler.java @@ -29,7 +29,7 @@ import java.util.Queue; * NOTE: This class assumes that the transport will not use a closed channel again or attempt to reconnect, which * is the way that Netty3Transport currently works */ -public class HandshakeWaitingHandler extends SimpleChannelHandler { +public class Netty3HandshakeWaitingHandler extends SimpleChannelHandler { private final ESLogger logger; @@ -39,7 +39,7 @@ public class HandshakeWaitingHandler extends SimpleChannelHandler { /** * @param logger We pass a context aware logger here (logger that is aware of the node name & env) */ - public HandshakeWaitingHandler(ESLogger logger) { + public Netty3HandshakeWaitingHandler(ESLogger logger) { this.logger = logger; } @@ -56,13 +56,13 @@ public class HandshakeWaitingHandler extends SimpleChannelHandler { // We synchronize here to allow all pending writes to be processed prior to any writes coming from // another thread - synchronized (HandshakeWaitingHandler.this) { + synchronized (Netty3HandshakeWaitingHandler.this) { handshaken = true; while (!pendingWrites.isEmpty()) { MessageEvent event = pendingWrites.remove(); ctx.sendDownstream(event); } - ctx.getPipeline().remove(HandshakeWaitingHandler.class); + ctx.getPipeline().remove(Netty3HandshakeWaitingHandler.class); } ctx.sendUpstream(e); @@ -74,7 +74,7 @@ public class HandshakeWaitingHandler extends SimpleChannelHandler { logger.error("SSL/TLS handshake failed, closing channel: {}", cause.getMessage()); } - synchronized (HandshakeWaitingHandler.this) { + synchronized (Netty3HandshakeWaitingHandler.this) { // Set failure on the futures of each message so that listeners are called while (!pendingWrites.isEmpty()) { DownstreamMessageEvent event = (DownstreamMessageEvent) pendingWrites.remove(); diff --git a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/SecurityNetty3Transport.java b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/SecurityNetty3Transport.java index b2b90737bfa..8eb6c13a32c 100644 --- a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/SecurityNetty3Transport.java +++ b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty3/SecurityNetty3Transport.java @@ -235,7 +235,7 @@ public class SecurityNetty3Transport extends Netty3Transport { sslEngine.setUseClientMode(true); ctx.getPipeline().replace(this, "ssl", new SslHandler(sslEngine)); - ctx.getPipeline().addAfter("ssl", "handshake", new HandshakeWaitingHandler(logger)); + ctx.getPipeline().addAfter("ssl", "handshake", new Netty3HandshakeWaitingHandler(logger)); ctx.sendDownstream(e); } diff --git a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/HandshakeWaitingHandler.java b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/HandshakeWaitingHandler.java deleted file mode 100644 index e20f8b21cd0..00000000000 --- a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/HandshakeWaitingHandler.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.security.transport.netty4; - -import org.elasticsearch.common.logging.ESLogger; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.DownstreamMessageEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.handler.ssl.SslHandler; - -import java.util.LinkedList; -import java.util.Queue; - -/** - * Netty requires that nothing be written to the channel prior to the handshake. Writing before the handshake - * completes, results in odd SSLExceptions being thrown. Channel writes can happen from any thread that - * can access the channel and Netty does not provide a way to ensure the handshake has occurred before the - * application writes to the channel. This handler will queue up writes until the handshake has occurred and - * then will pass the writes through the pipeline. After all writes have been completed, this handler removes - * itself from the pipeline. - * - * NOTE: This class assumes that the transport will not use a closed channel again or attempt to reconnect, which - * is the way that Netty3Transport currently works - */ -public class HandshakeWaitingHandler extends SimpleChannelHandler { - - private final ESLogger logger; - - private boolean handshaken = false; - private Queue pendingWrites = new LinkedList<>(); - - /** - * @param logger We pass a context aware logger here (logger that is aware of the node name & env) - */ - public HandshakeWaitingHandler(ESLogger logger) { - this.logger = logger; - } - - @Override - public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception { - SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); - - final ChannelFuture handshakeFuture = sslHandler.handshake(); - handshakeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (handshakeFuture.isSuccess()) { - logger.debug("SSL/TLS handshake completed for channel"); - - // We synchronize here to allow all pending writes to be processed prior to any writes coming from - // another thread - synchronized (HandshakeWaitingHandler.this) { - handshaken = true; - while (!pendingWrites.isEmpty()) { - MessageEvent event = pendingWrites.remove(); - ctx.sendDownstream(event); - } - ctx.getPipeline().remove(HandshakeWaitingHandler.class); - } - - ctx.sendUpstream(e); - } else { - Throwable cause = handshakeFuture.getCause(); - if (logger.isDebugEnabled()) { - logger.debug("SSL/TLS handshake failed, closing channel: {}", cause, cause.getMessage()); - } else { - logger.error("SSL/TLS handshake failed, closing channel: {}", cause.getMessage()); - } - - synchronized (HandshakeWaitingHandler.this) { - // Set failure on the futures of each message so that listeners are called - while (!pendingWrites.isEmpty()) { - DownstreamMessageEvent event = (DownstreamMessageEvent) pendingWrites.remove(); - event.getFuture().setFailure(cause); - } - - // Some writes may be waiting to acquire lock, if so the SetFailureOnAddQueue will set - // failure on their futures - pendingWrites = new SetFailureOnAddQueue(cause); - handshakeFuture.getChannel().close(); - } - } - } - }); - } - - @Override - public synchronized void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - // Writes can come from any thread so we need to ensure that we do not let any through - // until handshake has completed - if (!handshaken) { - pendingWrites.add(e); - return; - } - ctx.sendDownstream(e); - } - - synchronized boolean hasPendingWrites() { - return !pendingWrites.isEmpty(); - } - - private static class SetFailureOnAddQueue extends LinkedList { - private final Throwable cause; - - SetFailureOnAddQueue(Throwable cause) { - super(); - this.cause = cause; - } - - @Override - public boolean add(MessageEvent messageEvent) { - DownstreamMessageEvent event = (DownstreamMessageEvent) messageEvent; - event.getFuture().setFailure(cause); - return false; - } - - } -} diff --git a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4Transport.java b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4Transport.java index 4a235900c6e..ece004c8b9d 100644 --- a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4Transport.java +++ b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4Transport.java @@ -8,18 +8,19 @@ package org.elasticsearch.xpack.security.transport.netty4; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -35,14 +36,14 @@ import javax.net.ssl.SSLParameters; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.security.Security.featureEnabledSetting; import static org.elasticsearch.xpack.security.Security.setting; import static org.elasticsearch.xpack.security.Security.settingPrefix; -import org.elasticsearch.common.settings.Setting.Property; - /** * Implementation of a transport that extends the {@link Netty4Transport} to add SSL and IP Filtering @@ -141,13 +142,18 @@ public class SecurityNetty4Transport extends Netty4Transport { */ @Override protected void onAfterChannelsConnected(NodeChannels nodeChannels) { + List, Channel>> handshakes = new ArrayList<>(); for (Channel channel : nodeChannels.allChannels) { SslHandler handler = channel.pipeline().get(SslHandler.class); if (handler != null) { - handler.handshakeFuture().awaitUninterruptibly(30L, TimeUnit.SECONDS); - if (!handler.handshakeFuture().isSuccess()) { - throw new ElasticsearchException("handshake failed for channel [{}]", channel); - } + handshakes.add(Tuple.tuple(handler.handshakeFuture(), channel)); + } + } + + for (Tuple, Channel> handshake : handshakes) { + handshake.v1().awaitUninterruptibly(30L, TimeUnit.SECONDS); + if (!handshake.v1().isSuccess()) { + throw new ElasticsearchException("handshake failed for channel [{}]", handshake.v2()); } } } diff --git a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/transport/netty3/HandshakeWaitingHandlerTests.java b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/transport/netty3/Netty3HandshakeWaitingHandlerTests.java similarity index 97% rename from elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/transport/netty3/HandshakeWaitingHandlerTests.java rename to elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/transport/netty3/Netty3HandshakeWaitingHandlerTests.java index 8eed8b303b0..6f6edf6fbea 100644 --- a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/transport/netty3/HandshakeWaitingHandlerTests.java +++ b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/transport/netty3/Netty3HandshakeWaitingHandlerTests.java @@ -52,7 +52,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -public class HandshakeWaitingHandlerTests extends ESTestCase { +public class Netty3HandshakeWaitingHandlerTests extends ESTestCase { private static final int CONCURRENT_CLIENT_REQUESTS = 20; private int iterations; @@ -145,7 +145,7 @@ public class HandshakeWaitingHandlerTests extends ESTestCase { engine.setUseClientMode(true); return Channels.pipeline( new SslHandler(engine), - new HandshakeWaitingHandler(Loggers.getLogger(HandshakeWaitingHandler.class))); + new Netty3HandshakeWaitingHandler(Loggers.getLogger(Netty3HandshakeWaitingHandler.class))); } }); @@ -162,7 +162,7 @@ public class HandshakeWaitingHandlerTests extends ESTestCase { // Wait for pending writes to prevent IOExceptions Channel channel = handshakeFuture.getChannel(); - HandshakeWaitingHandler handler = channel.getPipeline().get(HandshakeWaitingHandler.class); + Netty3HandshakeWaitingHandler handler = channel.getPipeline().get(Netty3HandshakeWaitingHandler.class); if (handler != null) { boolean noMoreWrites = awaitBusy(() -> handler.hasPendingWrites() == false); assertTrue(noMoreWrites);