Wrapping a `BytesArray` in a `StreamInput` for deserialization is inefficient. This forces Jackson to internally buffer (i.e. copy) all bytes from the `BytesArray` before deserializing, adding overhead for copying the bytes and managing the buffers. This commit fixes a number of spots where `BytesArray` is the most common type of `BytesReference` to special case this type and parse it more efficiently. Also improves parsing `String`s to use the more efficient direct `String` parsing APIs.
This commit is contained in:
parent
3373b1406a
commit
af2e2782eb
|
@ -36,7 +36,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -81,7 +80,7 @@ public class CborXContent implements XContent {
|
|||
@Override
|
||||
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
|
||||
DeprecationHandler deprecationHandler, String content) throws IOException {
|
||||
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(new StringReader(content)));
|
||||
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(content));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,7 +35,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -82,7 +81,7 @@ public class JsonXContent implements XContent {
|
|||
@Override
|
||||
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
|
||||
DeprecationHandler deprecationHandler, String content) throws IOException {
|
||||
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(new StringReader(content)));
|
||||
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(content));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -83,7 +82,7 @@ public class SmileXContent implements XContent {
|
|||
@Override
|
||||
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
|
||||
DeprecationHandler deprecationHandler, String content) throws IOException {
|
||||
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(new StringReader(content)));
|
||||
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(content));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -76,7 +75,7 @@ public class YamlXContent implements XContent {
|
|||
@Override
|
||||
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
|
||||
DeprecationHandler deprecationHandler, String content) throws IOException {
|
||||
return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(new StringReader(content)));
|
||||
return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(content));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
|
@ -40,7 +41,6 @@ import org.elasticsearch.rest.action.document.RestBulkAction;
|
|||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -96,7 +96,7 @@ public final class BulkRequestParser {
|
|||
* if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored
|
||||
*/
|
||||
private static BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker,
|
||||
XContentType xContentType) {
|
||||
XContentType xContentType) {
|
||||
final int length;
|
||||
if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
|
||||
length = nextMarker - from - 1;
|
||||
|
@ -156,10 +156,7 @@ public final class BulkRequestParser {
|
|||
line++;
|
||||
|
||||
// now parse the action
|
||||
// EMPTY is safe here because we never call namedObject
|
||||
try (InputStream stream = data.slice(from, nextMarker - from).streamInput();
|
||||
XContentParser parser = xContent
|
||||
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
|
||||
try (XContentParser parser = createParser(data, xContent, from, nextMarker)) {
|
||||
// move pointers
|
||||
from = nextMarker + 1;
|
||||
|
||||
|
@ -204,7 +201,7 @@ public final class BulkRequestParser {
|
|||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){
|
||||
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
if (!allowExplicitIndex) {
|
||||
throw new IllegalArgumentException("explicit index in bulk is not allowed");
|
||||
}
|
||||
|
@ -299,10 +296,8 @@ public final class BulkRequestParser {
|
|||
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
|
||||
.setRequireAlias(requireAlias)
|
||||
.routing(routing);
|
||||
// EMPTY is safe here because we never call namedObject
|
||||
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
|
||||
XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, dataStream)) {
|
||||
try (XContentParser sliceParser = createParser(
|
||||
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent)) {
|
||||
updateRequest.fromXContent(sliceParser);
|
||||
}
|
||||
if (fetchSourceContext != null) {
|
||||
|
@ -322,4 +317,35 @@ public final class BulkRequestParser {
|
|||
}
|
||||
}
|
||||
|
||||
private static XContentParser createParser(BytesReference data, XContent xContent) throws IOException {
|
||||
if (data instanceof BytesArray) {
|
||||
return parseBytesArray(xContent, (BytesArray) data, 0, data.length());
|
||||
} else {
|
||||
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, data.streamInput());
|
||||
}
|
||||
}
|
||||
|
||||
// Create an efficient parser of the given bytes, trying to directly parse a byte array if possible and falling back to stream wrapping
|
||||
// otherwise.
|
||||
private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker) throws IOException {
|
||||
if (data instanceof BytesArray) {
|
||||
return parseBytesArray(xContent, (BytesArray) data, from, nextMarker);
|
||||
} else {
|
||||
final int length = nextMarker - from;
|
||||
final BytesReference slice = data.slice(from, length);
|
||||
if (slice instanceof BytesArray) {
|
||||
return parseBytesArray(xContent, (BytesArray) slice, 0, length);
|
||||
} else {
|
||||
// EMPTY is safe here because we never call namedObject
|
||||
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, slice.streamInput());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static XContentParser parseBytesArray(XContent xContent, BytesArray array, int from, int nextMarker) throws IOException {
|
||||
final int offset = array.offset();
|
||||
// EMPTY is safe here because we never call namedObject
|
||||
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(),
|
||||
offset + from, nextMarker - from);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,11 @@ public class XContentHelper {
|
|||
}
|
||||
return XContentFactory.xContent(xContentType).createParser(xContentRegistry, deprecationHandler, compressedInput);
|
||||
} else {
|
||||
if (bytes instanceof BytesArray) {
|
||||
final BytesArray array = (BytesArray) bytes;
|
||||
return xContentType.xContent().createParser(
|
||||
xContentRegistry, deprecationHandler, array.array(), array.offset(), array.length());
|
||||
}
|
||||
return xContentType.xContent().createParser(xContentRegistry, deprecationHandler, bytes.streamInput());
|
||||
}
|
||||
}
|
||||
|
@ -106,10 +111,19 @@ public class XContentHelper {
|
|||
compressedStreamInput = new BufferedInputStream(compressedStreamInput);
|
||||
}
|
||||
input = compressedStreamInput;
|
||||
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
|
||||
} else if (bytes instanceof BytesArray) {
|
||||
final BytesArray arr = (BytesArray) bytes;
|
||||
final byte[] raw = arr.array();
|
||||
final int offset = arr.offset();
|
||||
final int length = arr.length();
|
||||
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(raw, offset, length);
|
||||
return new Tuple<>(Objects.requireNonNull(contentType),
|
||||
convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered));
|
||||
} else {
|
||||
input = bytes.streamInput();
|
||||
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
|
||||
}
|
||||
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
|
||||
try (InputStream stream = input) {
|
||||
return new Tuple<>(Objects.requireNonNull(contentType),
|
||||
convertToMap(XContentFactory.xContent(contentType), stream, ordered));
|
||||
|
@ -148,6 +162,21 @@ public class XContentHelper {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a byte array in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
|
||||
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
|
||||
*/
|
||||
public static Map<String, Object> convertToMap(XContent xContent, byte[] bytes, int offset, int length, boolean ordered)
|
||||
throws ElasticsearchParseException {
|
||||
// It is safe to use EMPTY here because this never uses namedObject
|
||||
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes, offset, length)) {
|
||||
return ordered ? parser.mapOrdered() : parser.map();
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchParseException("Failed to parse content to map", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static String convertToJson(BytesReference bytes, boolean reformatJson) throws IOException {
|
||||
return convertToJson(bytes, reformatJson, false);
|
||||
|
@ -183,19 +212,31 @@ public class XContentHelper {
|
|||
}
|
||||
|
||||
// It is safe to use EMPTY here because this never uses namedObject
|
||||
try (InputStream stream = bytes.streamInput();
|
||||
XContentParser parser = XContentFactory.xContent(xContentType).createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
|
||||
parser.nextToken();
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
if (prettyPrint) {
|
||||
builder.prettyPrint();
|
||||
if (bytes instanceof BytesArray) {
|
||||
final BytesArray array = (BytesArray) bytes;
|
||||
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, array.array(), array.offset(), array.length())) {
|
||||
return toJsonString(prettyPrint, parser);
|
||||
}
|
||||
} else {
|
||||
try (InputStream stream = bytes.streamInput();
|
||||
XContentParser parser = XContentFactory.xContent(xContentType).createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
|
||||
return toJsonString(prettyPrint, parser);
|
||||
}
|
||||
builder.copyCurrentStructure(parser);
|
||||
return Strings.toString(builder);
|
||||
}
|
||||
}
|
||||
|
||||
private static String toJsonString(boolean prettyPrint, XContentParser parser) throws IOException {
|
||||
parser.nextToken();
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
if (prettyPrint) {
|
||||
builder.prettyPrint();
|
||||
}
|
||||
builder.copyCurrentStructure(parser);
|
||||
return Strings.toString(builder);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the provided changes into the source. If the key exists in the changes, it overrides the one in source
|
||||
* unless both are Maps, in which case it recursively updated it.
|
||||
|
|
|
@ -254,12 +254,12 @@ public class IndexingSlowLogTests extends ESTestCase {
|
|||
() -> new IndexingSlowLogMessage(index, doc, 10, true, 3));
|
||||
assertThat(e, hasToString(containsString("_failed_to_convert_[Unrecognized token 'invalid':"
|
||||
+ " was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\\n"
|
||||
+ " at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper)")));
|
||||
+ " at [Source: ")));
|
||||
assertNotNull(e.getCause());
|
||||
assertThat(e.getCause(), instanceOf(JsonParseException.class));
|
||||
assertThat(e.getCause(), hasToString(containsString("Unrecognized token 'invalid':"
|
||||
+ " was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n"
|
||||
+ " at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper)")));
|
||||
+ " was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n"
|
||||
+ " at [Source: ")));
|
||||
}
|
||||
|
||||
public void testReformatSetting() {
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.io.PathUtilsForTesting;
|
||||
|
@ -1283,8 +1284,7 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
* Create a new {@link XContentParser}.
|
||||
*/
|
||||
protected final XContentParser createParser(XContentBuilder builder) throws IOException {
|
||||
return builder.generator().contentType().xContent()
|
||||
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput());
|
||||
return createParser(builder.contentType().xContent(), BytesReference.bytes(builder));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1312,7 +1312,7 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
* Create a new {@link XContentParser}.
|
||||
*/
|
||||
protected final XContentParser createParser(XContent xContent, BytesReference data) throws IOException {
|
||||
return xContent.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, data.streamInput());
|
||||
return createParser(xContentRegistry(), xContent, data);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1320,6 +1320,11 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
*/
|
||||
protected final XContentParser createParser(NamedXContentRegistry namedXContentRegistry, XContent xContent,
|
||||
BytesReference data) throws IOException {
|
||||
if (data instanceof BytesArray) {
|
||||
final BytesArray array = (BytesArray) data;
|
||||
return xContent.createParser(
|
||||
namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, array.array(), array.offset(), array.length());
|
||||
}
|
||||
return xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, data.streamInput());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue