diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index 73135c2a145..69d84dfb78f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -20,12 +20,11 @@ package org.elasticsearch.http.netty4; import io.netty.channel.Channel; -import io.netty.channel.ChannelPromise; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpResponse; +import org.elasticsearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; @@ -36,38 +35,12 @@ public class Netty4HttpChannel implements HttpChannel { Netty4HttpChannel(Channel channel) { this.channel = channel; - this.channel.closeFuture().addListener(f -> { - if (f.isSuccess()) { - closeContext.complete(null); - } else { - Throwable cause = f.cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - closeContext.completeExceptionally(new Exception(cause)); - } else { - closeContext.completeExceptionally((Exception) cause); - } - } - }); + Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); } @Override public void sendResponse(HttpResponse response, ActionListener listener) { - ChannelPromise writePromise = channel.newPromise(); - writePromise.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(null); - } else { - final Throwable cause = f.cause(); - ExceptionsHelper.maybeDieOnAnotherThread(cause); - if (cause instanceof Error) { - listener.onFailure(new Exception(cause)); - } else { - listener.onFailure((Exception) cause); - } - } - }); - channel.writeAndFlush(response, writePromise); + channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel)); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 472e34d09fc..cad95d26270 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -38,7 +38,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest msg) { Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); @@ -72,7 +72,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler { - if (f.isSuccess()) { - closeContext.complete(null); - } else { - Throwable cause = f.cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - closeContext.completeExceptionally(new Exception(cause)); - } else { - closeContext.completeExceptionally((Exception) cause); - } - } - }); + Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4InternalESLogger.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4InternalESLogger.java index 38527151695..4eca1803b63 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4InternalESLogger.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4InternalESLogger.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty4; import io.netty.util.internal.logging.AbstractInternalLogger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.common.SuppressLoggerChecks; @SuppressLoggerChecks(reason = "safely delegates to logger") diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index ef96f75be89..4c68466efc4 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -46,35 +46,56 @@ public class Netty4TcpChannel implements TcpChannel { this.isServer = isServer; this.profile = profile; this.connectContext = new CompletableContext<>(); - this.channel.closeFuture().addListener(f -> { - if (f.isSuccess()) { - closeContext.complete(null); - } else { - Throwable cause = f.cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - closeContext.completeExceptionally(new Exception(cause)); - } else { - closeContext.completeExceptionally((Exception) cause); - } - } - }); + addListener(this.channel.closeFuture(), closeContext); + addListener(connectFuture, connectContext); + } - connectFuture.addListener(f -> { + /** + * Adds a listener that completes the given {@link CompletableContext} to the given {@link ChannelFuture}. + * @param channelFuture Channel future + * @param context Context to complete + */ + public static void addListener(ChannelFuture channelFuture, CompletableContext context) { + channelFuture.addListener(f -> { if (f.isSuccess()) { - connectContext.complete(null); + context.complete(null); } else { Throwable cause = f.cause(); if (cause instanceof Error) { ExceptionsHelper.maybeDieOnAnotherThread(cause); - connectContext.completeExceptionally(new Exception(cause)); + context.completeExceptionally(new Exception(cause)); } else { - connectContext.completeExceptionally((Exception) cause); + context.completeExceptionally((Exception) cause); } } }); } + /** + * Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener} + * on its completion. + * @param listener lister to invoke + * @param channel channel + * @return write promise + */ + public static ChannelPromise addPromise(ActionListener listener, Channel channel) { + ChannelPromise writePromise = channel.newPromise(); + writePromise.addListener(f -> { + if (f.isSuccess()) { + listener.onResponse(null); + } else { + final Throwable cause = f.cause(); + ExceptionsHelper.maybeDieOnAnotherThread(cause); + if (cause instanceof Error) { + listener.onFailure(new Exception(cause)); + } else { + listener.onFailure((Exception) cause); + } + } + }); + return writePromise; + } + @Override public void close() { channel.close(); @@ -122,21 +143,7 @@ public class Netty4TcpChannel implements TcpChannel { @Override public void sendMessage(BytesReference reference, ActionListener listener) { - ChannelPromise writePromise = channel.newPromise(); - writePromise.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(null); - } else { - final Throwable cause = f.cause(); - ExceptionsHelper.maybeDieOnAnotherThread(cause); - if (cause instanceof Error) { - listener.onFailure(new Exception(cause)); - } else { - listener.onFailure((Exception) cause); - } - } - }); - channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); + channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel)); if (channel.eventLoop().isShutdown()) { listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java index 9ef3f296f06..830b0a8c203 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.netty4; import io.netty.channel.Channel; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.transport.TcpServerChannel; @@ -36,19 +35,7 @@ public class Netty4TcpServerChannel implements TcpServerChannel { Netty4TcpServerChannel(Channel channel, String profile) { this.channel = channel; this.profile = profile; - this.channel.closeFuture().addListener(f -> { - if (f.isSuccess()) { - closeContext.complete(null); - } else { - Throwable cause = f.cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - closeContext.completeExceptionally(new Exception(cause)); - } else { - closeContext.completeExceptionally((Exception) cause); - } - } - }); + Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); } @Override