Serialize Outbound Messages on IO Threads (#56961) (#57080)

Almost every outbound message is serialized to buffers of 16k pagesize.
We were serializing these messages off the IO loop (and retaining the concrete message
instance as well) and would then enqueue it on the IO loop to be dealt with as soon as the
channel is ready.
1. This would cause buffers to be held onto for longer than necessary, causing less reuse on average.
2. If a channel was slow for some reason, not only would concrete message instances queue up for it, but also 16k of buffers would be reserved for each message until it would be written+flushed physically.

With this change, the serialization happens on the event loop which effectively limits the number of buffers that `N` IO-threads will ever use so long as messages are small and channels writable.
Also, this change dereferences the reference to the concrete outbound message as soon as it has been serialized to save some more on GC.

This reduces the GC time for a default PMC run by about 50% in experiments (3 nodes, 2G heap each, loopback ... obvious caveat is that GC isn't that heavy in the first place with recent changes but still a measurable gain).
I also expect it to be helpful for master node stability by causing less of a spike if master is e.g. hit by a large number of requests that are processed batched (e.g. shard snapshot status updates) and responded to in a short time frame all at once.

Obviously, the downside to this change is that it introduces more latency on the IO loop for the serialization. But since we read all of these messages on the IO loop as well I don't see it as much of a qualitative change really and the more predictable buffer use seems much more valuable relatively.
This commit is contained in:
Armin Braun 2020-06-02 16:15:18 +02:00 committed by GitHub
parent 808835ac1c
commit ba2d70d8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 65 additions and 39 deletions

View File

@ -33,9 +33,11 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.Transports;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
@ -88,9 +90,10 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
assert msg instanceof ByteBuf;
final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
assert msg instanceof OutboundHandler.SendContext;
final boolean queued = queuedWrites.offer(
new WriteOperation(Netty4Utils.toByteBuf(((OutboundHandler.SendContext) msg).get()), promise));
assert queued;
}

View File

@ -25,8 +25,8 @@ import io.netty.channel.ChannelPromise;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;
@ -142,11 +142,11 @@ public class Netty4TcpChannel implements TcpChannel {
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));
public void sendMessage(OutboundHandler.SendContext sendContext) {
channel.writeAndFlush(sendContext, addPromise(sendContext, channel));
if (channel.eventLoop().isShutdown()) {
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
sendContext.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
}

View File

@ -22,8 +22,10 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.TcpChannel;
import java.io.IOException;
import java.nio.channels.SocketChannel;
public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
@ -38,8 +40,16 @@ public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
this.profile = profile;
}
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
@Override
public void sendMessage(OutboundHandler.SendContext sendContext) {
final BytesReference message;
try {
message = sendContext.get();
} catch (IOException e) {
sendContext.onFailure(e);
return;
}
getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext));
}
@Override

View File

@ -159,7 +159,7 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
}
private void doClean() {
if (client != null) {
if (client != null && index != null) {
try {
client.admin().indices().prepareDelete(index).get();
} catch (Exception e) {

View File

@ -41,7 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Set;
final class OutboundHandler {
public final class OutboundHandler {
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
@ -65,12 +65,7 @@ final class OutboundHandler {
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
SendContext sendContext = new SendContext(channel, () -> bytes, listener);
try {
internalSend(channel, sendContext);
} catch (IOException e) {
// This should not happen as the bytes are already serialized
throw new AssertionError(e);
}
internalSend(channel, sendContext);
}
/**
@ -124,11 +119,10 @@ final class OutboundHandler {
internalSend(channel, sendContext);
}
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
private void internalSend(TcpChannel channel, SendContext sendContext) {
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
BytesReference reference = sendContext.get();
try {
channel.sendMessage(reference, sendContext);
channel.sendMessage(sendContext);
} catch (RuntimeException ex) {
sendContext.onFailure(ex);
CloseableChannel.closeChannel(channel);
@ -147,7 +141,7 @@ final class OutboundHandler {
private static class MessageSerializer implements CheckedSupplier<BytesReference, IOException>, Releasable {
private final OutboundMessage message;
private OutboundMessage message;
private final BigArrays bigArrays;
private volatile ReleasableBytesStreamOutput bytesStreamOutput;
@ -158,8 +152,12 @@ final class OutboundHandler {
@Override
public BytesReference get() throws IOException {
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
return message.serialize(bytesStreamOutput);
try {
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
return message.serialize(bytesStreamOutput);
} finally {
message = null;
}
}
@Override
@ -168,10 +166,10 @@ final class OutboundHandler {
}
}
private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
public class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
private final TcpChannel channel;
private final CheckedSupplier<BytesReference, IOException> messageSupplier;
private CheckedSupplier<BytesReference, IOException> messageSupplier;
private final ActionListener<Void> listener;
private final Releasable optionalReleasable;
private long messageSize = -1;
@ -189,10 +187,13 @@ final class OutboundHandler {
this.optionalReleasable = optionalReleasable;
}
@Override
public BytesReference get() throws IOException {
BytesReference message;
try {
assert messageSupplier != null;
message = messageSupplier.get();
messageSupplier = null;
messageSize = message.length();
TransportLogger.logOutboundMessage(channel, message);
return message;
@ -211,6 +212,7 @@ final class OutboundHandler {
@Override
protected void innerOnFailure(Exception e) {
messageSupplier = null;
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e);
} else {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.transport;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.unit.TimeValue;
@ -59,13 +58,11 @@ public interface TcpChannel extends CloseableChannel {
InetSocketAddress getRemoteAddress();
/**
* Sends a tcp message to the channel. The listener will be executed once the send process has been
* completed.
* Sends a tcp message to the channel.
*
* @param reference to send to channel
* @param listener to execute upon send completion
* @param sendContext Send Context
*/
void sendMessage(BytesReference reference, ActionListener<Void> listener);
void sendMessage(OutboundHandler.SendContext sendContext);
/**
* Adds a listener that will be executed when the channel is connected. If the channel is still

View File

@ -72,9 +72,9 @@ public class InboundHandlerTests extends ESTestCase {
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {});
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], new StatsTracker(), threadPool,
BigArrays.NON_RECYCLING_INSTANCE);
BigArrays.NON_RECYCLING_INSTANCE);
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, outboundHandler::sendBytes);
requestHandlers = new Transport.RequestHandlers();
responseHandlers = new Transport.ResponseHandlers();
handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, handshaker, keepAlive, requestHandlers,

View File

@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;
@ -88,9 +89,14 @@ public class FakeTcpChannel implements TcpChannel {
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
messageCaptor.set(reference);
listenerCaptor.set(listener);
public void sendMessage(OutboundHandler.SendContext sendContext) {
try {
messageCaptor.set(sendContext.get());
} catch (IOException e) {
sendContext.onFailure(e);
return;
}
listenerCaptor.set(sendContext);
}
@Override

View File

@ -55,6 +55,7 @@ import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.StatsTracker;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel;
@ -365,8 +366,15 @@ public class MockNioTransport extends TcpTransport {
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
public void sendMessage(OutboundHandler.SendContext sendContext) {
final BytesReference message;
try {
message = sendContext.get();
} catch (IOException e) {
sendContext.onFailure(e);
return;
}
getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext));
}
}

View File

@ -142,7 +142,7 @@ public abstract class ESXPackSmokeClientTestCase extends LuceneTestCase {
}
private void doClean() {
if (client != null) {
if (client != null && index != null) {
try {
client.admin().indices().prepareDelete(index).get();
} catch (Exception e) {