From ba2d70d8eb62752327d71a4838c4a0cd2204c280 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 2 Jun 2020 16:15:18 +0200 Subject: [PATCH] Serialize Outbound Messages on IO Threads (#56961) (#57080) Almost every outbound message is serialized to buffers of 16k pagesize. We were serializing these messages off the IO loop (and retaining the concrete message instance as well) and would then enqueue it on the IO loop to be dealt with as soon as the channel is ready. 1. This would cause buffers to be held onto for longer than necessary, causing less reuse on average. 2. If a channel was slow for some reason, not only would concrete message instances queue up for it, but also 16k of buffers would be reserved for each message until it would be written+flushed physically. With this change, the serialization happens on the event loop which effectively limits the number of buffers that `N` IO-threads will ever use so long as messages are small and channels writable. Also, this change dereferences the reference to the concrete outbound message as soon as it has been serialized to save some more on GC. This reduces the GC time for a default PMC run by about 50% in experiments (3 nodes, 2G heap each, loopback ... obvious caveat is that GC isn't that heavy in the first place with recent changes but still a measurable gain). I also expect it to be helpful for master node stability by causing less of a spike if master is e.g. hit by a large number of requests that are processed batched (e.g. shard snapshot status updates) and responded to in a short time frame all at once. Obviously, the downside to this change is that it introduces more latency on the IO loop for the serialization. But since we read all of these messages on the IO loop as well I don't see it as much of a qualitative change really and the more predictable buffer use seems much more valuable relatively. --- .../netty4/Netty4MessageChannelHandler.java | 9 ++++-- .../transport/netty4/Netty4TcpChannel.java | 8 ++--- .../transport/nio/NioTcpChannel.java | 14 ++++++-- .../smoketest/ESSmokeClientTestCase.java | 2 +- .../transport/OutboundHandler.java | 32 ++++++++++--------- .../elasticsearch/transport/TcpChannel.java | 9 ++---- .../transport/InboundHandlerTests.java | 4 +-- .../transport/FakeTcpChannel.java | 12 +++++-- .../transport/nio/MockNioTransport.java | 12 +++++-- .../xpack/ESXPackSmokeClientTestCase.java | 2 +- 10 files changed, 65 insertions(+), 39 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index d6abd8e3b02..3826d98627e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -33,9 +33,11 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.InboundPipeline; +import org.elasticsearch.transport.OutboundHandler; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transports; +import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; @@ -88,9 +90,10 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { } @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - assert msg instanceof ByteBuf; - final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise)); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException { + assert msg instanceof OutboundHandler.SendContext; + final boolean queued = queuedWrites.offer( + new WriteOperation(Netty4Utils.toByteBuf(((OutboundHandler.SendContext) msg).get()), promise)); assert queued; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 4c68466efc4..8001056bad9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -25,8 +25,8 @@ import io.netty.channel.ChannelPromise; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.concurrent.CompletableContext; +import org.elasticsearch.transport.OutboundHandler; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TransportException; @@ -142,11 +142,11 @@ public class Netty4TcpChannel implements TcpChannel { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel)); + public void sendMessage(OutboundHandler.SendContext sendContext) { + channel.writeAndFlush(sendContext, addPromise(sendContext, channel)); if (channel.eventLoop().isShutdown()) { - listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); + sendContext.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java index 8e88f2cf6ab..d5e3e97f123 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java @@ -22,8 +22,10 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.transport.OutboundHandler; import org.elasticsearch.transport.TcpChannel; +import java.io.IOException; import java.nio.channels.SocketChannel; public class NioTcpChannel extends NioSocketChannel implements TcpChannel { @@ -38,8 +40,16 @@ public class NioTcpChannel extends NioSocketChannel implements TcpChannel { this.profile = profile; } - public void sendMessage(BytesReference reference, ActionListener listener) { - getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener)); + @Override + public void sendMessage(OutboundHandler.SendContext sendContext) { + final BytesReference message; + try { + message = sendContext.get(); + } catch (IOException e) { + sendContext.onFailure(e); + return; + } + getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext)); } @Override diff --git a/qa/smoke-test-client/src/test/java/org/elasticsearch/smoketest/ESSmokeClientTestCase.java b/qa/smoke-test-client/src/test/java/org/elasticsearch/smoketest/ESSmokeClientTestCase.java index 00460a8f004..f0e907b8750 100644 --- a/qa/smoke-test-client/src/test/java/org/elasticsearch/smoketest/ESSmokeClientTestCase.java +++ b/qa/smoke-test-client/src/test/java/org/elasticsearch/smoketest/ESSmokeClientTestCase.java @@ -159,7 +159,7 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase { } private void doClean() { - if (client != null) { + if (client != null && index != null) { try { client.admin().indices().prepareDelete(index).get(); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index b9b1bbb7a4e..5fd684c1b04 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -41,7 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Set; -final class OutboundHandler { +public final class OutboundHandler { private static final Logger logger = LogManager.getLogger(OutboundHandler.class); @@ -65,12 +65,7 @@ final class OutboundHandler { void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener listener) { SendContext sendContext = new SendContext(channel, () -> bytes, listener); - try { - internalSend(channel, sendContext); - } catch (IOException e) { - // This should not happen as the bytes are already serialized - throw new AssertionError(e); - } + internalSend(channel, sendContext); } /** @@ -124,11 +119,10 @@ final class OutboundHandler { internalSend(channel, sendContext); } - private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException { + private void internalSend(TcpChannel channel, SendContext sendContext) { channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); - BytesReference reference = sendContext.get(); try { - channel.sendMessage(reference, sendContext); + channel.sendMessage(sendContext); } catch (RuntimeException ex) { sendContext.onFailure(ex); CloseableChannel.closeChannel(channel); @@ -147,7 +141,7 @@ final class OutboundHandler { private static class MessageSerializer implements CheckedSupplier, Releasable { - private final OutboundMessage message; + private OutboundMessage message; private final BigArrays bigArrays; private volatile ReleasableBytesStreamOutput bytesStreamOutput; @@ -158,8 +152,12 @@ final class OutboundHandler { @Override public BytesReference get() throws IOException { - bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); - return message.serialize(bytesStreamOutput); + try { + bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); + return message.serialize(bytesStreamOutput); + } finally { + message = null; + } } @Override @@ -168,10 +166,10 @@ final class OutboundHandler { } } - private class SendContext extends NotifyOnceListener implements CheckedSupplier { + public class SendContext extends NotifyOnceListener implements CheckedSupplier { private final TcpChannel channel; - private final CheckedSupplier messageSupplier; + private CheckedSupplier messageSupplier; private final ActionListener listener; private final Releasable optionalReleasable; private long messageSize = -1; @@ -189,10 +187,13 @@ final class OutboundHandler { this.optionalReleasable = optionalReleasable; } + @Override public BytesReference get() throws IOException { BytesReference message; try { + assert messageSupplier != null; message = messageSupplier.get(); + messageSupplier = null; messageSize = message.length(); TransportLogger.logOutboundMessage(channel, message); return message; @@ -211,6 +212,7 @@ final class OutboundHandler { @Override protected void innerOnFailure(Exception e) { + messageSupplier = null; if (NetworkExceptionHelper.isCloseConnectionException(e)) { logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index d02b9ad9997..ab2b3d3589c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.unit.TimeValue; @@ -59,13 +58,11 @@ public interface TcpChannel extends CloseableChannel { InetSocketAddress getRemoteAddress(); /** - * Sends a tcp message to the channel. The listener will be executed once the send process has been - * completed. + * Sends a tcp message to the channel. * - * @param reference to send to channel - * @param listener to execute upon send completion + * @param sendContext Send Context */ - void sendMessage(BytesReference reference, ActionListener listener); + void sendMessage(OutboundHandler.SendContext sendContext); /** * Adds a listener that will be executed when the channel is connected. If the channel is still diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index ee99c7bb0d0..00892105859 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -72,9 +72,9 @@ public class InboundHandlerTests extends ESTestCase { channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address()); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {}); - TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage); OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], new StatsTracker(), threadPool, - BigArrays.NON_RECYCLING_INSTANCE); + BigArrays.NON_RECYCLING_INSTANCE); + TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, outboundHandler::sendBytes); requestHandlers = new Transport.RequestHandlers(); responseHandlers = new Transport.ResponseHandlers(); handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, handshaker, keepAlive, requestHandlers, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index e9593fc6622..5790229c2e1 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.concurrent.CompletableContext; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; @@ -88,9 +89,14 @@ public class FakeTcpChannel implements TcpChannel { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - messageCaptor.set(reference); - listenerCaptor.set(listener); + public void sendMessage(OutboundHandler.SendContext sendContext) { + try { + messageCaptor.set(sendContext.get()); + } catch (IOException e) { + sendContext.onFailure(e); + return; + } + listenerCaptor.set(sendContext); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 34cee1c4020..f01174721c3 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -55,6 +55,7 @@ import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.InboundPipeline; +import org.elasticsearch.transport.OutboundHandler; import org.elasticsearch.transport.StatsTracker; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpServerChannel; @@ -365,8 +366,15 @@ public class MockNioTransport extends TcpTransport { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener)); + public void sendMessage(OutboundHandler.SendContext sendContext) { + final BytesReference message; + try { + message = sendContext.get(); + } catch (IOException e) { + sendContext.onFailure(e); + return; + } + getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext)); } } diff --git a/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ESXPackSmokeClientTestCase.java b/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ESXPackSmokeClientTestCase.java index e8d886330ae..d14972d5552 100644 --- a/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ESXPackSmokeClientTestCase.java +++ b/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ESXPackSmokeClientTestCase.java @@ -142,7 +142,7 @@ public abstract class ESXPackSmokeClientTestCase extends LuceneTestCase { } private void doClean() { - if (client != null) { + if (client != null && index != null) { try { client.admin().indices().prepareDelete(index).get(); } catch (Exception e) {