From af2e2782eb1ca8d7564a5d7e8d8e2d1d16782a97 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 24 Aug 2020 15:49:15 +0200 Subject: [PATCH] Stop Needlessly Copying Bytes in XContent Parsing (#61447) (#61469) Wrapping a `BytesArray` in a `StreamInput` for deserialization is inefficient. This forces Jackson to internally buffer (i.e. copy) all bytes from the `BytesArray` before deserializing, adding overhead for copying the bytes and managing the buffers. This commit fixes a number of spots where `BytesArray` is the most common type of `BytesReference` to special case this type and parse it more efficiently. Also improves parsing `String`s to use the more efficient direct `String` parsing APIs. --- .../common/xcontent/cbor/CborXContent.java | 3 +- .../common/xcontent/json/JsonXContent.java | 3 +- .../common/xcontent/smile/SmileXContent.java | 3 +- .../common/xcontent/yaml/YamlXContent.java | 3 +- .../action/bulk/BulkRequestParser.java | 48 +++++++++++---- .../common/xcontent/XContentHelper.java | 61 ++++++++++++++++--- .../index/IndexingSlowLogTests.java | 6 +- .../org/elasticsearch/test/ESTestCase.java | 11 +++- 8 files changed, 103 insertions(+), 35 deletions(-) diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/cbor/CborXContent.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/cbor/CborXContent.java index 774427d401e..7d298fd6246 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/cbor/CborXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/cbor/CborXContent.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; -import java.io.StringReader; import java.util.Set; /** @@ -81,7 +80,7 @@ public class CborXContent implements XContent { @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, String content) throws IOException { - return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(new StringReader(content))); + return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(content)); } @Override diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java index 99df76c437d..6dfe31b296f 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; -import java.io.StringReader; import java.util.Set; /** @@ -82,7 +81,7 @@ public class JsonXContent implements XContent { @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, String content) throws IOException { - return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(new StringReader(content))); + return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(content)); } @Override diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java index c2917ad7a0f..1461b4ca2dd 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; -import java.io.StringReader; import java.util.Set; /** @@ -83,7 +82,7 @@ public class SmileXContent implements XContent { @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, String content) throws IOException { - return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(new StringReader(content))); + return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(content)); } @Override diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/yaml/YamlXContent.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/yaml/YamlXContent.java index 64615447659..42cc66682bc 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/yaml/YamlXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/yaml/YamlXContent.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; -import java.io.StringReader; import java.util.Set; /** @@ -76,7 +75,7 @@ public class YamlXContent implements XContent { @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, String content) throws IOException { - return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(new StringReader(content))); + return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(content)); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 1667a5767a7..792d57714c6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; @@ -40,7 +41,6 @@ import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; -import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; @@ -96,7 +96,7 @@ public final class BulkRequestParser { * if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored */ private static BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker, - XContentType xContentType) { + XContentType xContentType) { final int length; if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') { length = nextMarker - from - 1; @@ -156,10 +156,7 @@ public final class BulkRequestParser { line++; // now parse the action - // EMPTY is safe here because we never call namedObject - try (InputStream stream = data.slice(from, nextMarker - from).streamInput(); - XContentParser parser = xContent - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + try (XContentParser parser = createParser(data, xContent, from, nextMarker)) { // move pointers from = nextMarker + 1; @@ -204,7 +201,7 @@ public final class BulkRequestParser { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token.isValue()) { - if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){ + if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) { if (!allowExplicitIndex) { throw new IllegalArgumentException("explicit index in bulk is not allowed"); } @@ -299,10 +296,8 @@ public final class BulkRequestParser { .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .setRequireAlias(requireAlias) .routing(routing); - // EMPTY is safe here because we never call namedObject - try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput(); - XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, dataStream)) { + try (XContentParser sliceParser = createParser( + sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent)) { updateRequest.fromXContent(sliceParser); } if (fetchSourceContext != null) { @@ -322,4 +317,35 @@ public final class BulkRequestParser { } } + private static XContentParser createParser(BytesReference data, XContent xContent) throws IOException { + if (data instanceof BytesArray) { + return parseBytesArray(xContent, (BytesArray) data, 0, data.length()); + } else { + return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, data.streamInput()); + } + } + + // Create an efficient parser of the given bytes, trying to directly parse a byte array if possible and falling back to stream wrapping + // otherwise. + private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker) throws IOException { + if (data instanceof BytesArray) { + return parseBytesArray(xContent, (BytesArray) data, from, nextMarker); + } else { + final int length = nextMarker - from; + final BytesReference slice = data.slice(from, length); + if (slice instanceof BytesArray) { + return parseBytesArray(xContent, (BytesArray) slice, 0, length); + } else { + // EMPTY is safe here because we never call namedObject + return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, slice.streamInput()); + } + } + } + + private static XContentParser parseBytesArray(XContent xContent, BytesArray array, int from, int nextMarker) throws IOException { + final int offset = array.offset(); + // EMPTY is safe here because we never call namedObject + return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(), + offset + from, nextMarker - from); + } } diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index f5f67e8e6de..2dd7eba2c92 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -76,6 +76,11 @@ public class XContentHelper { } return XContentFactory.xContent(xContentType).createParser(xContentRegistry, deprecationHandler, compressedInput); } else { + if (bytes instanceof BytesArray) { + final BytesArray array = (BytesArray) bytes; + return xContentType.xContent().createParser( + xContentRegistry, deprecationHandler, array.array(), array.offset(), array.length()); + } return xContentType.xContent().createParser(xContentRegistry, deprecationHandler, bytes.streamInput()); } } @@ -106,10 +111,19 @@ public class XContentHelper { compressedStreamInput = new BufferedInputStream(compressedStreamInput); } input = compressedStreamInput; + contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input); + } else if (bytes instanceof BytesArray) { + final BytesArray arr = (BytesArray) bytes; + final byte[] raw = arr.array(); + final int offset = arr.offset(); + final int length = arr.length(); + contentType = xContentType != null ? xContentType : XContentFactory.xContentType(raw, offset, length); + return new Tuple<>(Objects.requireNonNull(contentType), + convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered)); } else { input = bytes.streamInput(); + contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input); } - contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input); try (InputStream stream = input) { return new Tuple<>(Objects.requireNonNull(contentType), convertToMap(XContentFactory.xContent(contentType), stream, ordered)); @@ -148,6 +162,21 @@ public class XContentHelper { } } + /** + * Convert a byte array in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any + * error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input. + */ + public static Map convertToMap(XContent xContent, byte[] bytes, int offset, int length, boolean ordered) + throws ElasticsearchParseException { + // It is safe to use EMPTY here because this never uses namedObject + try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes, offset, length)) { + return ordered ? parser.mapOrdered() : parser.map(); + } catch (IOException e) { + throw new ElasticsearchParseException("Failed to parse content to map", e); + } + } + @Deprecated public static String convertToJson(BytesReference bytes, boolean reformatJson) throws IOException { return convertToJson(bytes, reformatJson, false); @@ -183,19 +212,31 @@ public class XContentHelper { } // It is safe to use EMPTY here because this never uses namedObject - try (InputStream stream = bytes.streamInput(); - XContentParser parser = XContentFactory.xContent(xContentType).createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) { - parser.nextToken(); - XContentBuilder builder = XContentFactory.jsonBuilder(); - if (prettyPrint) { - builder.prettyPrint(); + if (bytes instanceof BytesArray) { + final BytesArray array = (BytesArray) bytes; + try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, array.array(), array.offset(), array.length())) { + return toJsonString(prettyPrint, parser); + } + } else { + try (InputStream stream = bytes.streamInput(); + XContentParser parser = XContentFactory.xContent(xContentType).createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) { + return toJsonString(prettyPrint, parser); } - builder.copyCurrentStructure(parser); - return Strings.toString(builder); } } + private static String toJsonString(boolean prettyPrint, XContentParser parser) throws IOException { + parser.nextToken(); + XContentBuilder builder = XContentFactory.jsonBuilder(); + if (prettyPrint) { + builder.prettyPrint(); + } + builder.copyCurrentStructure(parser); + return Strings.toString(builder); + } + /** * Updates the provided changes into the source. If the key exists in the changes, it overrides the one in source * unless both are Maps, in which case it recursively updated it. diff --git a/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java index 34f6551d85c..f4d972be0f4 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java @@ -254,12 +254,12 @@ public class IndexingSlowLogTests extends ESTestCase { () -> new IndexingSlowLogMessage(index, doc, 10, true, 3)); assertThat(e, hasToString(containsString("_failed_to_convert_[Unrecognized token 'invalid':" + " was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\\n" - + " at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper)"))); + + " at [Source: "))); assertNotNull(e.getCause()); assertThat(e.getCause(), instanceOf(JsonParseException.class)); assertThat(e.getCause(), hasToString(containsString("Unrecognized token 'invalid':" - + " was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" - + " at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper)"))); + + " was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" + + " at [Source: "))); } public void testReformatSetting() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index a4727e1c843..d1f0e11a7f8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; @@ -1283,8 +1284,7 @@ public abstract class ESTestCase extends LuceneTestCase { * Create a new {@link XContentParser}. */ protected final XContentParser createParser(XContentBuilder builder) throws IOException { - return builder.generator().contentType().xContent() - .createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()); + return createParser(builder.contentType().xContent(), BytesReference.bytes(builder)); } /** @@ -1312,7 +1312,7 @@ public abstract class ESTestCase extends LuceneTestCase { * Create a new {@link XContentParser}. */ protected final XContentParser createParser(XContent xContent, BytesReference data) throws IOException { - return xContent.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, data.streamInput()); + return createParser(xContentRegistry(), xContent, data); } /** @@ -1320,6 +1320,11 @@ public abstract class ESTestCase extends LuceneTestCase { */ protected final XContentParser createParser(NamedXContentRegistry namedXContentRegistry, XContent xContent, BytesReference data) throws IOException { + if (data instanceof BytesArray) { + final BytesArray array = (BytesArray) data; + return xContent.createParser( + namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, array.array(), array.offset(), array.length()); + } return xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, data.streamInput()); }