Serialize Monitoring Bulk Request Compressed (#56410) (#56442)

Even with changes from #48854 we're still seeing significant (as in tens and hundreds of MB)
buffer usage for bulk exports in some cases which destabilizes master nodes.
Since we need to know the serialized length of the bulk body we can't do the serialization
in a streaming manner. (also it's not easily doable with the HTTP client API we're using anyway).
=> let's at least serialize on heap in compressed form and decompress as we're streaming to the
HTTP connection. For small requests this adds negligible overhead but for large requests this reduces
the size of the payload field by about an order of magnitude (empirically determined) which is a massive reduction in size when considering O(100MB) bulk requests.
This commit is contained in:
Armin Braun 2020-05-08 23:16:07 +02:00 committed by GitHub
parent 95e5e9e598
commit 0a254cf223
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 55 additions and 8 deletions

View File

@ -15,6 +15,7 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.time.DateFormatter;
@ -28,7 +29,9 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Map;
@ -56,10 +59,15 @@ class HttpExportBulk extends ExportBulk {
private final DateFormatter formatter;
/**
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
* The compressed bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
*/
private BytesReference payload = null;
/**
* Uncompressed length of {@link #payload} contents.
*/
private long payloadLength = -1L;
HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
final DateFormatter dateTimeFormatter, final ThreadContext threadContext) {
super(name, threadContext);
@ -73,14 +81,17 @@ class HttpExportBulk extends ExportBulk {
public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
try {
if (docs != null && docs.isEmpty() == false) {
try (BytesStreamOutput payload = new BytesStreamOutput()) {
final BytesStreamOutput scratch = new BytesStreamOutput();
final CountingOutputStream countingStream;
try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) {
countingStream = new CountingOutputStream(payload);
for (MonitoringDoc monitoringDoc : docs) {
writeDocument(monitoringDoc, payload);
writeDocument(monitoringDoc, countingStream);
}
// store the payload until we flush
this.payload = payload.bytes();
}
payloadLength = countingStream.bytesWritten;
// store the payload until we flush
this.payload = scratch.bytes();
}
} catch (Exception e) {
throw new ExportException("failed to add documents to export bulk [{}]", e, name);
@ -97,7 +108,8 @@ class HttpExportBulk extends ExportBulk {
request.addParameter(param.getKey(), param.getValue());
}
try {
request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON));
request.setEntity(new InputStreamEntity(
CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON));
} catch (IOException e) {
listener.onFailure(e);
return;
@ -127,7 +139,7 @@ class HttpExportBulk extends ExportBulk {
}
}
private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException {
private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOException {
final XContentType xContentType = XContentType.JSON;
final XContent xContent = xContentType.xContent();
@ -166,4 +178,39 @@ class HttpExportBulk extends ExportBulk {
name, index, id, doc.getType()
);
}
// Counting input stream used to record the uncompressed size of the bulk payload when writing it to a compressed stream
private static final class CountingOutputStream extends FilterOutputStream {
private long bytesWritten = 0;
CountingOutputStream(final OutputStream out) {
super(out);
}
@Override
public void write(final int b) throws IOException {
out.write(b);
count(1);
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
out.write(b, off, len);
count(len);
}
@Override
public void close() {
// don't close nested stream
}
protected void count(final long written) {
if (written != -1) {
bytesWritten += written;
}
}
}
}