Netty request/response tracer should wait for send
We write to Netty channels in an async fashion, but notify listeners via a transport service adapter before we are certain that the channel write succeeded. In particular, the tracer logs are implemented via a transport service adapter and this means that we can write tracer logs before a write was successful and in some cases the write might fail leading to misleading logs. This commit attaches the transport service adapters to channel writes as a listener so that the notification occurs only after a successful write. Relates #18500
This commit is contained in:
parent
452faa220c
commit
4c7993ea71
|
@ -925,7 +925,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||||
future.addListener(listener);
|
future.addListener(listener);
|
||||||
addedReleaseListener = true;
|
addedReleaseListener = true;
|
||||||
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
|
final TransportRequestOptions finalOptions = options;
|
||||||
|
ChannelFutureListener channelFutureListener =
|
||||||
|
f -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions);
|
||||||
|
future.addListener(channelFutureListener);
|
||||||
} finally {
|
} finally {
|
||||||
if (!addedReleaseListener) {
|
if (!addedReleaseListener) {
|
||||||
Releasables.close(bStream.bytes());
|
Releasables.close(bStream.bytes());
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.transport.support.TransportStatus;
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
|
import org.jboss.netty.channel.ChannelFutureListener;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -114,7 +115,10 @@ public class NettyTransportChannel implements TransportChannel {
|
||||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||||
future.addListener(listener);
|
future.addListener(listener);
|
||||||
addedReleaseListener = true;
|
addedReleaseListener = true;
|
||||||
transportServiceAdapter.onResponseSent(requestId, action, response, options);
|
final TransportResponseOptions finalOptions = options;
|
||||||
|
ChannelFutureListener onResponseSentListener =
|
||||||
|
f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
|
||||||
|
future.addListener(onResponseSentListener);
|
||||||
} finally {
|
} finally {
|
||||||
if (!addedReleaseListener && bStream != null) {
|
if (!addedReleaseListener && bStream != null) {
|
||||||
Releasables.close(bStream.bytes());
|
Releasables.close(bStream.bytes());
|
||||||
|
@ -137,8 +141,10 @@ public class NettyTransportChannel implements TransportChannel {
|
||||||
BytesReference bytes = stream.bytes();
|
BytesReference bytes = stream.bytes();
|
||||||
ChannelBuffer buffer = bytes.toChannelBuffer();
|
ChannelBuffer buffer = bytes.toChannelBuffer();
|
||||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||||
channel.write(buffer);
|
ChannelFuture future = channel.write(buffer);
|
||||||
transportServiceAdapter.onResponseSent(requestId, action, error);
|
ChannelFutureListener onResponseSentListener =
|
||||||
|
f -> transportServiceAdapter.onResponseSent(requestId, action, error);
|
||||||
|
future.addListener(onResponseSentListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void close() {
|
private void close() {
|
||||||
|
|
Loading…
Reference in New Issue