Cleanup Netty handshake waiting handlers

This commit cleans up the Netty handshake waiting handlers. We rename
the Netty 3 implementation to include "Netty3" in the name, the Netty 4
implementation is not needed, and we improve the handling of waiting for
the handshakes to complete when connecting.

Original commit: elastic/x-pack-elasticsearch@f736fdc8f0
This commit is contained in:
Jason Tedor 2016-08-02 21:27:40 -04:00
parent 8579dbf80b
commit f4ba670b25
5 changed files with 23 additions and 142 deletions

View File

@ -29,7 +29,7 @@ import java.util.Queue;
* NOTE: This class assumes that the transport will not use a closed channel again or attempt to reconnect, which
* is the way that Netty3Transport currently works
*/
public class HandshakeWaitingHandler extends SimpleChannelHandler {
public class Netty3HandshakeWaitingHandler extends SimpleChannelHandler {
private final ESLogger logger;
@ -39,7 +39,7 @@ public class HandshakeWaitingHandler extends SimpleChannelHandler {
/**
* @param logger We pass a context aware logger here (logger that is aware of the node name & env)
*/
public HandshakeWaitingHandler(ESLogger logger) {
public Netty3HandshakeWaitingHandler(ESLogger logger) {
this.logger = logger;
}
@ -56,13 +56,13 @@ public class HandshakeWaitingHandler extends SimpleChannelHandler {
// We synchronize here to allow all pending writes to be processed prior to any writes coming from
// another thread
synchronized (HandshakeWaitingHandler.this) {
synchronized (Netty3HandshakeWaitingHandler.this) {
handshaken = true;
while (!pendingWrites.isEmpty()) {
MessageEvent event = pendingWrites.remove();
ctx.sendDownstream(event);
}
ctx.getPipeline().remove(HandshakeWaitingHandler.class);
ctx.getPipeline().remove(Netty3HandshakeWaitingHandler.class);
}
ctx.sendUpstream(e);
@ -74,7 +74,7 @@ public class HandshakeWaitingHandler extends SimpleChannelHandler {
logger.error("SSL/TLS handshake failed, closing channel: {}", cause.getMessage());
}
synchronized (HandshakeWaitingHandler.this) {
synchronized (Netty3HandshakeWaitingHandler.this) {
// Set failure on the futures of each message so that listeners are called
while (!pendingWrites.isEmpty()) {
DownstreamMessageEvent event = (DownstreamMessageEvent) pendingWrites.remove();

View File

@ -235,7 +235,7 @@ public class SecurityNetty3Transport extends Netty3Transport {
sslEngine.setUseClientMode(true);
ctx.getPipeline().replace(this, "ssl", new SslHandler(sslEngine));
ctx.getPipeline().addAfter("ssl", "handshake", new HandshakeWaitingHandler(logger));
ctx.getPipeline().addAfter("ssl", "handshake", new Netty3HandshakeWaitingHandler(logger));
ctx.sendDownstream(e);
}

View File

@ -1,125 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport.netty4;
import org.elasticsearch.common.logging.ESLogger;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import java.util.LinkedList;
import java.util.Queue;
/**
* Netty requires that nothing be written to the channel prior to the handshake. Writing before the handshake
* completes, results in odd SSLExceptions being thrown. Channel writes can happen from any thread that
* can access the channel and Netty does not provide a way to ensure the handshake has occurred before the
* application writes to the channel. This handler will queue up writes until the handshake has occurred and
* then will pass the writes through the pipeline. After all writes have been completed, this handler removes
* itself from the pipeline.
*
* NOTE: This class assumes that the transport will not use a closed channel again or attempt to reconnect, which
* is the way that Netty3Transport currently works
*/
public class HandshakeWaitingHandler extends SimpleChannelHandler {
private final ESLogger logger;
private boolean handshaken = false;
private Queue<MessageEvent> pendingWrites = new LinkedList<>();
/**
* @param logger We pass a context aware logger here (logger that is aware of the node name &amp; env)
*/
public HandshakeWaitingHandler(ESLogger logger) {
this.logger = logger;
}
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
final ChannelFuture handshakeFuture = sslHandler.handshake();
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (handshakeFuture.isSuccess()) {
logger.debug("SSL/TLS handshake completed for channel");
// We synchronize here to allow all pending writes to be processed prior to any writes coming from
// another thread
synchronized (HandshakeWaitingHandler.this) {
handshaken = true;
while (!pendingWrites.isEmpty()) {
MessageEvent event = pendingWrites.remove();
ctx.sendDownstream(event);
}
ctx.getPipeline().remove(HandshakeWaitingHandler.class);
}
ctx.sendUpstream(e);
} else {
Throwable cause = handshakeFuture.getCause();
if (logger.isDebugEnabled()) {
logger.debug("SSL/TLS handshake failed, closing channel: {}", cause, cause.getMessage());
} else {
logger.error("SSL/TLS handshake failed, closing channel: {}", cause.getMessage());
}
synchronized (HandshakeWaitingHandler.this) {
// Set failure on the futures of each message so that listeners are called
while (!pendingWrites.isEmpty()) {
DownstreamMessageEvent event = (DownstreamMessageEvent) pendingWrites.remove();
event.getFuture().setFailure(cause);
}
// Some writes may be waiting to acquire lock, if so the SetFailureOnAddQueue will set
// failure on their futures
pendingWrites = new SetFailureOnAddQueue(cause);
handshakeFuture.getChannel().close();
}
}
}
});
}
@Override
public synchronized void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// Writes can come from any thread so we need to ensure that we do not let any through
// until handshake has completed
if (!handshaken) {
pendingWrites.add(e);
return;
}
ctx.sendDownstream(e);
}
synchronized boolean hasPendingWrites() {
return !pendingWrites.isEmpty();
}
private static class SetFailureOnAddQueue extends LinkedList<MessageEvent> {
private final Throwable cause;
SetFailureOnAddQueue(Throwable cause) {
super();
this.cause = cause;
}
@Override
public boolean add(MessageEvent messageEvent) {
DownstreamMessageEvent event = (DownstreamMessageEvent) messageEvent;
event.getFuture().setFailure(cause);
return false;
}
}
}

View File

@ -8,18 +8,19 @@ package org.elasticsearch.xpack.security.transport.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -35,14 +36,14 @@ import javax.net.ssl.SSLParameters;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.security.Security.featureEnabledSetting;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.Security.settingPrefix;
import org.elasticsearch.common.settings.Setting.Property;
/**
* Implementation of a transport that extends the {@link Netty4Transport} to add SSL and IP Filtering
@ -141,13 +142,18 @@ public class SecurityNetty4Transport extends Netty4Transport {
*/
@Override
protected void onAfterChannelsConnected(NodeChannels nodeChannels) {
List<Tuple<Future<Channel>, Channel>> handshakes = new ArrayList<>();
for (Channel channel : nodeChannels.allChannels) {
SslHandler handler = channel.pipeline().get(SslHandler.class);
if (handler != null) {
handler.handshakeFuture().awaitUninterruptibly(30L, TimeUnit.SECONDS);
if (!handler.handshakeFuture().isSuccess()) {
throw new ElasticsearchException("handshake failed for channel [{}]", channel);
}
handshakes.add(Tuple.tuple(handler.handshakeFuture(), channel));
}
}
for (Tuple<Future<Channel>, Channel> handshake : handshakes) {
handshake.v1().awaitUninterruptibly(30L, TimeUnit.SECONDS);
if (!handshake.v1().isSuccess()) {
throw new ElasticsearchException("handshake failed for channel [{}]", handshake.v2());
}
}
}

View File

@ -52,7 +52,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class HandshakeWaitingHandlerTests extends ESTestCase {
public class Netty3HandshakeWaitingHandlerTests extends ESTestCase {
private static final int CONCURRENT_CLIENT_REQUESTS = 20;
private int iterations;
@ -145,7 +145,7 @@ public class HandshakeWaitingHandlerTests extends ESTestCase {
engine.setUseClientMode(true);
return Channels.pipeline(
new SslHandler(engine),
new HandshakeWaitingHandler(Loggers.getLogger(HandshakeWaitingHandler.class)));
new Netty3HandshakeWaitingHandler(Loggers.getLogger(Netty3HandshakeWaitingHandler.class)));
}
});
@ -162,7 +162,7 @@ public class HandshakeWaitingHandlerTests extends ESTestCase {
// Wait for pending writes to prevent IOExceptions
Channel channel = handshakeFuture.getChannel();
HandshakeWaitingHandler handler = channel.getPipeline().get(HandshakeWaitingHandler.class);
Netty3HandshakeWaitingHandler handler = channel.getPipeline().get(Netty3HandshakeWaitingHandler.class);
if (handler != null) {
boolean noMoreWrites = awaitBusy(() -> handler.hasPendingWrites() == false);
assertTrue(noMoreWrites);