Add support for newline delimited JSON Content-Type (#22947)

This commit adds support for the newline delimited JSON Content-Type, which is how
the bulk, multi-search, and multi-search template APIs expect data to be formatted. The
`elasticsearch-js` client has also been using this content type for these types of requests.

Closes #22943
This commit is contained in:
Jay Modi 2017-02-07 09:20:06 -05:00 committed by GitHub
parent f5e7c25e24
commit c898e8ab83
10 changed files with 254 additions and 9 deletions

View File

@ -133,7 +133,8 @@ public enum XContentType implements Writeable {
return type;
}
}
if(mediaType.toLowerCase(Locale.ROOT).startsWith("application/*")) {
final String lowercaseMediaType = mediaType.toLowerCase(Locale.ROOT);
if (lowercaseMediaType.startsWith("application/*")) {
return JSON;
}
@ -152,6 +153,7 @@ public enum XContentType implements Writeable {
return type;
}
}
return null;
}

View File

@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -179,6 +180,10 @@ public class RestController extends AbstractComponent {
if (contentLength > 0 && hasContentTypeOrCanAutoDetect(request, handler) == false) {
sendContentTypeErrorMessage(request, responseChannel);
} else if (contentLength > 0 && handler != null && handler.supportsContentStream() &&
request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" +
request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead"));
} else {
if (canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
@ -242,22 +247,39 @@ public class RestController extends AbstractComponent {
// be removed!
deprecationLogger.deprecated("Plain text request bodies are deprecated. Use request parameters or body " +
"in a supported format.");
} else if (restHandler != null && restHandler.supportsContentStream() && restRequest.header("Content-Type") != null) {
final String lowercaseMediaType = restRequest.header("Content-Type").toLowerCase(Locale.ROOT);
// we also support line-delimited JSON, which isn't official and has a few variations
// http://specs.okfnlabs.org/ndjson/
// https://github.com/ndjson/ndjson-spec/blob/48ea03cea6796b614cfbff4d4eb921f0b1d35c26/specification.md
if (lowercaseMediaType.equals("application/x-ldjson") || lowercaseMediaType.equals("application/x-ndjson")) {
restRequest.setXContentType(XContentType.JSON);
} else if (isContentTypeRequired) {
return false;
} else {
return autoDetectXContentType(restRequest);
}
} else if (isContentTypeRequired) {
return false;
} else {
deprecationLogger.deprecated("Content type detection for rest requests is deprecated. Specify the content type using " +
"the [Content-Type] header.");
XContentType xContentType = XContentFactory.xContentType(restRequest.content());
if (xContentType == null) {
return false;
} else {
restRequest.setXContentType(xContentType);
}
return autoDetectXContentType(restRequest);
}
}
return true;
}
private boolean autoDetectXContentType(RestRequest restRequest) {
deprecationLogger.deprecated("Content type detection for rest requests is deprecated. Specify the content type using " +
"the [Content-Type] header.");
XContentType xContentType = XContentFactory.xContentType(restRequest.content());
if (xContentType == null) {
return false;
} else {
restRequest.setXContentType(xContentType);
}
return true;
}
private void sendContentTypeErrorMessage(RestRequest restRequest, RestChannel channel) throws IOException {
final List<String> contentTypeHeader = restRequest.getAllHeaderValues("Content-Type");
final String errorMessage;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContent;
/**
* Handler for REST requests
@ -46,4 +47,13 @@ public interface RestHandler {
default boolean supportsPlainText() {
return false;
}
/**
* Indicates if the RestHandler supports content as a stream. A stream would be multiple objects delineated by
* {@link XContent#streamSeparator()}. If a handler returns true this will affect the types of content that can be sent to
* this endpoint.
*/
default boolean supportsContentStream() {
return false;
}
}

View File

@ -116,6 +116,11 @@ public class RestBulkAction extends BaseRestHandler {
});
}
@Override
public boolean supportsContentStream() {
return true;
}
static final class Fields {
static final String ITEMS = "items";
static final String ERRORS = "errors";

View File

@ -186,6 +186,11 @@ public class RestMultiSearchAction extends BaseRestHandler {
}
}
@Override
public boolean supportsContentStream() {
return true;
}
private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.test.ESTestCase;
@ -68,6 +69,17 @@ public class BulkRequestTests extends ESTestCase {
assertThat(((IndexRequest) bulkRequest.requests().get(2)).source(), equalTo(new BytesArray("{ \"field1\" : \"value3\" }")));
}
public void testSimpleBulkWithCarriageReturn() throws Exception {
String bulkAction = "{ \"index\":{\"_index\":\"test\",\"_type\":\"type1\",\"_id\":\"1\"} }\r\n{ \"field1\" : \"value1\" }\r\n";
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(1));
assertThat(((IndexRequest) bulkRequest.requests().get(0)).source(), equalTo(new BytesArray("{ \"field1\" : \"value1\" }\r")));
Map<String, Object> sourceMap = XContentHelper.convertToMap(((IndexRequest) bulkRequest.requests().get(0)).source(),
false, XContentType.JSON).v2();
assertEquals("value1", sourceMap.get("field1"));
}
public void testSimpleBulk2() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk2.json");
BulkRequest bulkRequest = new BulkRequest();

View File

@ -80,6 +80,19 @@ public class MultiSearchRequestTests extends ESTestCase {
assertThat(request.requests().get(7).types().length, equalTo(0));
}
public void testSimpleAddWithCarriageReturn() throws Exception {
final String requestContent = "{\"index\":\"test\", \"ignore_unavailable\" : true, \"expand_wildcards\" : \"open,closed\"}}\r\n" +
"{\"query\" : {\"match_all\" :{}}}\r\n";
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
.withContent(new BytesArray(requestContent), XContentType.JSON).build();
MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, true);
assertThat(request.requests().size(), equalTo(1));
assertThat(request.requests().get(0).indices()[0], equalTo("test"));
assertThat(request.requests().get(0).indicesOptions(),
equalTo(IndicesOptions.fromOptions(true, true, true, true, IndicesOptions.strictExpandOpenAndForbidClosed())));
assertThat(request.requests().get(0).types().length, equalTo(0));
}
public void testSimpleAdd2() throws Exception {
MultiSearchRequest request = parseMultiSearchRequest("/org/elasticsearch/action/search/simple-msearch2.json");
assertThat(request.requests().size(), equalTo(5));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.rest;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -43,6 +44,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.yaml.YamlXContent;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
@ -309,6 +311,155 @@ public class RestControllerTests extends ESTestCase {
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
}
public void testDispatchWorksWithNewlineDelimitedJson() {
final String mimeType = randomFrom("application/x-ldjson", "application/x-ndjson");
String content = randomAsciiOfLengthBetween(1, BREAKER_LIMIT.bytesAsInt());
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(content), null).withPath("/foo")
.withHeaders(Collections.singletonMap("Content-Type", Collections.singletonList(mimeType))).build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}
public void testDispatchWithContentStream() {
final String mimeType = randomFrom("application/json", "application/smile");
String content = randomAsciiOfLengthBetween(1, BREAKER_LIMIT.bytesAsInt());
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(content), null).withPath("/foo")
.withHeaders(Collections.singletonMap("Content-Type", Collections.singletonList(mimeType))).build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}
public void testDispatchWithContentStreamAutoDetect() {
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray("{}"), null).withPath("/foo").build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
}
public void testNonStreamingXContentCausesErrorResponse() throws IOException {
// auto detect
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(YamlXContent.contentBuilder().startObject().endObject().bytes(), null).withPath("/foo").build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
// specified
fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(YamlXContent.contentBuilder().startObject().endObject().bytes(), XContentType.YAML).withPath("/foo").build();
channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}
public void testStrictModeContentStream() {
restController = new RestController(
Settings.builder().put(HttpTransportSettings.SETTING_HTTP_CONTENT_TYPE_REQUIRED.getKey(), true).build(),
Collections.emptySet(), null, null, circuitBreakerService);
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray("{}"), null).withPath("/foo")
.build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}
public void testUnknownContentWithContentStream() {
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray("aaaabbbbb"), null).withPath("/foo")
.withHeaders(Collections.singletonMap("Content-Type", Collections.singletonList("foo/bar")))
.build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
}
private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
HttpServerTransport {

View File

@ -81,4 +81,9 @@ public class RestMultiSearchTemplateAction extends BaseRestHandler {
});
return multiRequest;
}
@Override
public boolean supportsContentStream() {
return true;
}
}

View File

@ -70,4 +70,24 @@ public class MultiSearchTemplateRequestTests extends ESTestCase {
assertEquals(1, request.requests().get(1).getScriptParams().size());
assertEquals(1, request.requests().get(2).getScriptParams().size());
}
public void testParseWithCarriageReturn() throws Exception {
final String content = "{\"index\":[\"test0\", \"test1\"], \"request_cache\": true}\r\n" +
"{\"inline\": {\"query\" : {\"match_{{template}}\" :{}}}, \"params\": {\"template\": \"all\" } }\r\n";
RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
.withContent(new BytesArray(content), XContentType.JSON).build();
MultiSearchTemplateRequest request = RestMultiSearchTemplateAction.parseRequest(restRequest, true);
assertThat(request.requests().size(), equalTo(1));
assertThat(request.requests().get(0).getRequest().indices()[0], equalTo("test0"));
assertThat(request.requests().get(0).getRequest().indices()[1], equalTo("test1"));
assertThat(request.requests().get(0).getRequest().indices(), arrayContaining("test0", "test1"));
assertThat(request.requests().get(0).getRequest().requestCache(), equalTo(true));
assertThat(request.requests().get(0).getRequest().preference(), nullValue());
assertNotNull(request.requests().get(0).getScript());
assertEquals(ScriptType.INLINE, request.requests().get(0).getScriptType());
assertEquals("{\"query\":{\"match_{{template}}\":{}}}", request.requests().get(0).getScript());
assertEquals(1, request.requests().get(0).getScriptParams().size());
}
}