From 7f3769c7453758d6583873d62aa8a78f15c20487 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Wed, 8 Feb 2017 11:55:50 -0500 Subject: [PATCH] Remove ldjson support and document ndjson for bulk/msearch (#23049) This commit removes support for the `application/x-ldjson` Content-Type header as this was only used in the first draft of the spec and had very little uptake. Additionally, the docs for bulk and msearch have been updated to specifically call out ndjson and mention that the newline character may be preceded by a carriage return. Finally, the bulk request handling of the carriage return has been improved to remove this character from the source. Closes #23025 --- .../action/bulk/BulkRequest.java | 24 +++++++++++++++---- .../elasticsearch/rest/RestController.java | 6 ++--- .../org/elasticsearch/rest/RestRequest.java | 2 -- .../action/bulk/BulkRequestTests.java | 8 +------ .../rest/RestControllerTests.java | 2 +- docs/reference/docs/bulk.asciidoc | 8 ++++--- docs/reference/search/multi-search.asciidoc | 11 ++++++--- 7 files changed, 37 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 371659586f9..30d2f4d1fc8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -438,23 +438,25 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques if ("index".equals(action)) { if (opType == null) { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType) - .setPipeline(pipeline).source(data.slice(from, nextMarker - from), xContentType), payload); + .setPipeline(pipeline) + .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload); } else { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType) .create("create".equals(opType)).setPipeline(pipeline) - .source(data.slice(from, nextMarker - from), xContentType), payload); + .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); } } else if ("create".equals(action)) { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType) .create(true).setPipeline(pipeline) - .source(data.slice(from, nextMarker - from), xContentType), payload); + .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); } else if ("update".equals(action)) { UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict) .version(version).versionType(versionType) .routing(routing) .parent(parent); // EMPTY is safe here because we never call namedObject - try (XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY, data.slice(from, nextMarker - from))) { + try (XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY, + sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType))) { updateRequest.fromXContent(sliceParser); } if (fetchSourceContext != null) { @@ -485,6 +487,20 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return this; } + /** + * Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see + * if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored + */ + private BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker, XContentType xContentType) { + final int length; + if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') { + length = nextMarker - from - 1; + } else { + length = nextMarker - from; + } + return bytesReference.slice(from, length); + } + /** * Sets the number of shard copies that must be active before proceeding with the write. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index ae80b7129b6..cddd261da85 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -249,10 +249,8 @@ public class RestController extends AbstractComponent { "in a supported format."); } else if (restHandler != null && restHandler.supportsContentStream() && restRequest.header("Content-Type") != null) { final String lowercaseMediaType = restRequest.header("Content-Type").toLowerCase(Locale.ROOT); - // we also support line-delimited JSON, which isn't official and has a few variations - // http://specs.okfnlabs.org/ndjson/ - // https://github.com/ndjson/ndjson-spec/blob/48ea03cea6796b614cfbff4d4eb921f0b1d35c26/specification.md - if (lowercaseMediaType.equals("application/x-ldjson") || lowercaseMediaType.equals("application/x-ndjson")) { + // we also support newline delimited JSON: http://specs.okfnlabs.org/ndjson/ + if (lowercaseMediaType.equals("application/x-ndjson")) { restRequest.setXContentType(XContentType.JSON); } else if (isContentTypeRequired) { return false; diff --git a/core/src/main/java/org/elasticsearch/rest/RestRequest.java b/core/src/main/java/org/elasticsearch/rest/RestRequest.java index 7d41faf12e0..826f57e41b3 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/core/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -181,9 +181,7 @@ public abstract class RestRequest implements ToXContent.Params { /** * Sets the {@link XContentType} - * @deprecated this is only used to allow BWC with content-type detection */ - @Deprecated final void setXContentType(XContentType xContentType) { this.xContentType.set(xContentType); } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 95a217cd6cd..15cb43d89a9 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.bulk; -import org.apache.lucene.util.Constants; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; @@ -28,7 +27,6 @@ import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.ParsingException; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -57,10 +55,6 @@ import static org.hamcrest.Matchers.notNullValue; public class BulkRequestTests extends ESTestCase { public void testSimpleBulk1() throws Exception { String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk.json"); - // translate Windows line endings (\r\n) to standard ones (\n) - if (Constants.WINDOWS) { - bulkAction = Strings.replace(bulkAction, "\r\n", "\n"); - } BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON); assertThat(bulkRequest.numberOfActions(), equalTo(3)); @@ -74,7 +68,7 @@ public class BulkRequestTests extends ESTestCase { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON); assertThat(bulkRequest.numberOfActions(), equalTo(1)); - assertThat(((IndexRequest) bulkRequest.requests().get(0)).source(), equalTo(new BytesArray("{ \"field1\" : \"value1\" }\r"))); + assertThat(((IndexRequest) bulkRequest.requests().get(0)).source(), equalTo(new BytesArray("{ \"field1\" : \"value1\" }"))); Map sourceMap = XContentHelper.convertToMap(((IndexRequest) bulkRequest.requests().get(0)).source(), false, XContentType.JSON).v2(); assertEquals("value1", sourceMap.get("field1")); diff --git a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 502ac26823c..0ee6dd27d41 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -312,7 +312,7 @@ public class RestControllerTests extends ESTestCase { } public void testDispatchWorksWithNewlineDelimitedJson() { - final String mimeType = randomFrom("application/x-ldjson", "application/x-ndjson"); + final String mimeType = "application/x-ndjson"; String content = randomAsciiOfLengthBetween(1, BREAKER_LIMIT.bytesAsInt()); FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY) .withContent(new BytesArray(content), null).withPath("/foo") diff --git a/docs/reference/docs/bulk.asciidoc b/docs/reference/docs/bulk.asciidoc index 90e730cbf97..d25641cacbd 100644 --- a/docs/reference/docs/bulk.asciidoc +++ b/docs/reference/docs/bulk.asciidoc @@ -21,8 +21,8 @@ Python:: ********************************************* -The REST API endpoint is `/_bulk`, and it expects the following JSON -structure: +The REST API endpoint is `/_bulk`, and it expects the following newline delimited JSON +(NDJSON) structure: [source,js] -------------------------------------------------- @@ -36,7 +36,9 @@ optional_source\n -------------------------------------------------- // NOTCONSOLE -*NOTE*: the final line of data must end with a newline character `\n`. +*NOTE*: the final line of data must end with a newline character `\n`. Each newline character +may be preceded by a carriage return `\r`. When sending requests to this endpoint the +`Content-Type` header should be set to `application/x-ndjson`. The possible actions are `index`, `create`, `delete` and `update`. `index` and `create` expect a source on the next diff --git a/docs/reference/search/multi-search.asciidoc b/docs/reference/search/multi-search.asciidoc index 96e0e7691ac..f9188808661 100644 --- a/docs/reference/search/multi-search.asciidoc +++ b/docs/reference/search/multi-search.asciidoc @@ -4,9 +4,10 @@ The multi search API allows to execute several search requests within the same API. The endpoint for it is `_msearch`. -The format of the request is similar to the bulk API format, and the -structure is as follows (the structure is specifically optimized to -reduce parsing if a specific search ends up redirected to another node): +The format of the request is similar to the bulk API format and makes +use of the newline delimited JSON (NDJSON) format. the structure is as +follows (the structure is specifically optimized to reduce parsing if +a specific search ends up redirected to another node): [source,js] -------------------------------------------------- @@ -17,6 +18,10 @@ body\n -------------------------------------------------- // NOTCONSOLE +*NOTE*: the final line of data must end with a newline character `\n`. Each newline character +may be preceded by a carriage return `\r`. When sending requests to this endpoint the +`Content-Type` header should be set to `application/x-ndjson`. + The header part includes which index / indices to search on, optional (mapping) types to search on, the `search_type`, `preference`, and `routing`. The body includes the typical search body request (including