Cleanup Duplication in Netty4 Module (#40148) (#40563)

* Just drying up the listener/promise handling a little
This commit is contained in:
Armin Braun 2019-03-28 00:57:58 +01:00 committed by GitHub
parent 5a2ba34174
commit ebcb925afb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 93 deletions

View File

@ -20,12 +20,11 @@
package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import java.net.InetSocketAddress;
@ -36,38 +35,12 @@ public class Netty4HttpChannel implements HttpChannel {
Netty4HttpChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
}
@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
channel.writeAndFlush(response, writePromise);
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
}
@Override

View File

@ -38,7 +38,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
@ -72,7 +72,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {

View File

@ -20,10 +20,10 @@
package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import java.net.InetSocketAddress;
@ -34,19 +34,7 @@ public class Netty4HttpServerChannel implements HttpServerChannel {
Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
}
@Override

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty4;
import io.netty.util.internal.logging.AbstractInternalLogger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.SuppressLoggerChecks;
@SuppressLoggerChecks(reason = "safely delegates to logger")

View File

@ -46,35 +46,56 @@ public class Netty4TcpChannel implements TcpChannel {
this.isServer = isServer;
this.profile = profile;
this.connectContext = new CompletableContext<>();
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
addListener(this.channel.closeFuture(), closeContext);
addListener(connectFuture, connectContext);
}
connectFuture.addListener(f -> {
/**
* Adds a listener that completes the given {@link CompletableContext} to the given {@link ChannelFuture}.
* @param channelFuture Channel future
* @param context Context to complete
*/
public static void addListener(ChannelFuture channelFuture, CompletableContext<Void> context) {
channelFuture.addListener(f -> {
if (f.isSuccess()) {
connectContext.complete(null);
context.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
connectContext.completeExceptionally(new Exception(cause));
context.completeExceptionally(new Exception(cause));
} else {
connectContext.completeExceptionally((Exception) cause);
context.completeExceptionally((Exception) cause);
}
}
});
}
/**
* Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener}
* on its completion.
* @param listener lister to invoke
* @param channel channel
* @return write promise
*/
public static ChannelPromise addPromise(ActionListener<Void> listener, Channel channel) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
return writePromise;
}
@Override
public void close() {
channel.close();
@ -122,21 +143,7 @@ public class Netty4TcpChannel implements TcpChannel {
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));
if (channel.eventLoop().isShutdown()) {
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));

View File

@ -20,7 +20,6 @@
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpServerChannel;
@ -36,19 +35,7 @@ public class Netty4TcpServerChannel implements TcpServerChannel {
Netty4TcpServerChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
}
@Override