The `HttpExportBulk` exporter is using a lot more memory than it needs to by allocating buffers for serialization and IO: * Remove copying of all bytes when flushing, instead use the stream wrapper * Remove copying step turning the BAOS into a `byte[]` * This also avoids the allocation of a single huge `byte[]` and instead makes use of the internal paging logic of the `BytesStreamOutput` * Don't allocate a new BAOS for every document, just keep appending to a single BAOS
This commit is contained in:
parent
0645ee88e2
commit
ea9f094e75
|
@ -6,12 +6,9 @@
|
|||
package org.elasticsearch.xpack.monitoring.exporter.http;
|
||||
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.nio.entity.NByteArrayEntity;
|
||||
import org.apache.http.entity.InputStreamEntity;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
|
@ -19,11 +16,12 @@ import org.elasticsearch.client.ResponseListener;
|
|||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.time.DateFormatter;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
|
||||
|
@ -60,7 +58,7 @@ class HttpExportBulk extends ExportBulk {
|
|||
/**
|
||||
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
|
||||
*/
|
||||
private byte[] payload = null;
|
||||
private BytesReference payload = null;
|
||||
|
||||
HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
|
||||
final DateFormatter dateTimeFormatter, final ThreadContext threadContext) {
|
||||
|
@ -77,12 +75,11 @@ class HttpExportBulk extends ExportBulk {
|
|||
if (docs != null && docs.isEmpty() == false) {
|
||||
try (BytesStreamOutput payload = new BytesStreamOutput()) {
|
||||
for (MonitoringDoc monitoringDoc : docs) {
|
||||
// any failure caused by an individual doc will be written as an empty byte[], thus not impacting the rest
|
||||
payload.write(toBulkBytes(monitoringDoc));
|
||||
writeDocument(monitoringDoc, payload);
|
||||
}
|
||||
|
||||
// store the payload until we flush
|
||||
this.payload = BytesReference.toBytes(payload.bytes());
|
||||
this.payload = payload.bytes();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -94,12 +91,19 @@ class HttpExportBulk extends ExportBulk {
|
|||
public void doFlush(ActionListener<Void> listener) throws ExportException {
|
||||
if (payload == null) {
|
||||
listener.onFailure(new ExportException("unable to send documents because none were loaded for export bulk [{}]", name));
|
||||
} else if (payload.length != 0) {
|
||||
} else if (payload.length() != 0) {
|
||||
final Request request = new Request("POST", "/_bulk");
|
||||
for (Map.Entry<String, String> param : params.entrySet()) {
|
||||
request.addParameter(param.getKey(), param.getValue());
|
||||
}
|
||||
request.setEntity(new NByteArrayEntity(payload, ContentType.APPLICATION_JSON));
|
||||
try {
|
||||
request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON));
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
// null out serialized docs to make things easier on the GC
|
||||
payload = null;
|
||||
|
||||
client.performRequestAsync(request, new ResponseListener() {
|
||||
@Override
|
||||
|
@ -123,51 +127,43 @@ class HttpExportBulk extends ExportBulk {
|
|||
}
|
||||
}
|
||||
|
||||
private byte[] toBulkBytes(final MonitoringDoc doc) throws IOException {
|
||||
private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException {
|
||||
final XContentType xContentType = XContentType.JSON;
|
||||
final XContent xContent = xContentType.xContent();
|
||||
|
||||
final String index = MonitoringTemplateUtils.indexName(formatter, doc.getSystem(), doc.getTimestamp());
|
||||
final String id = doc.getId();
|
||||
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
|
||||
// Builds the bulk action metadata line
|
||||
builder.startObject();
|
||||
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
|
||||
// Builds the bulk action metadata line
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("index");
|
||||
{
|
||||
builder.startObject("index");
|
||||
{
|
||||
builder.field("_index", index);
|
||||
if (id != null) {
|
||||
builder.field("_id", id);
|
||||
}
|
||||
builder.field("_index", index);
|
||||
if (id != null) {
|
||||
builder.field("_id", id);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
// Adds action metadata line bulk separator
|
||||
out.write(xContent.streamSeparator());
|
||||
|
||||
// Adds the source of the monitoring document
|
||||
final BytesRef source = XContentHelper.toXContent(doc, xContentType, false).toBytesRef();
|
||||
out.write(source.bytes, source.offset, source.length);
|
||||
|
||||
// Adds final bulk separator
|
||||
out.write(xContent.streamSeparator());
|
||||
|
||||
logger.trace(
|
||||
"http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]",
|
||||
name, index, id, doc.getType()
|
||||
);
|
||||
|
||||
return BytesReference.toBytes(out.bytes());
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to render document [{}], skipping it [{}]", doc, name), e);
|
||||
|
||||
return BytesRef.EMPTY_BYTES;
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
||||
// Adds action metadata line bulk separator
|
||||
out.write(xContent.streamSeparator());
|
||||
|
||||
// Adds the source of the monitoring document
|
||||
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
|
||||
doc.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
}
|
||||
|
||||
// Adds final bulk separator
|
||||
out.write(xContent.streamSeparator());
|
||||
|
||||
logger.trace(
|
||||
"http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]",
|
||||
name, index, id, doc.getType()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue