From acb07c72b9d2fd6e21f9710ffbdf4573c01c9f22 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 25 May 2015 15:06:56 +0200 Subject: [PATCH] Bulk: throw exception if unrecognized parameter in action/metadata line Closes #10977 --- .../action/bulk/BulkRequest.java | 80 +++++++++++-------- .../action/bulk/BulkRequestTests.java | 52 ++++++++++++ .../action/bulk/simple-bulk6.json | 6 ++ .../action/bulk/simple-bulk7.json | 6 ++ .../action/bulk/simple-bulk8.json | 6 ++ .../action/bulk/simple-bulk9.json | 4 + 6 files changed, 121 insertions(+), 33 deletions(-) create mode 100644 src/test/java/org/elasticsearch/action/bulk/simple-bulk6.json create mode 100644 src/test/java/org/elasticsearch/action/bulk/simple-bulk7.json create mode 100644 src/test/java/org/elasticsearch/action/bulk/simple-bulk8.json create mode 100644 src/test/java/org/elasticsearch/action/bulk/simple-bulk9.json diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 617c3fc32bd..715c1d716d9 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -246,6 +246,7 @@ public class BulkRequest extends ActionRequest implements Composite public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable Object payload, boolean allowExplicitIndex) throws Exception { XContent xContent = XContentFactory.xContent(data); + int line = 0; int from = 0; int length = data.length(); byte marker = xContent.streamSeparator(); @@ -254,8 +255,9 @@ public class BulkRequest extends ActionRequest implements Composite if (nextMarker == -1) { break; } - // now parse the action + line++; + // now parse the action try (XContentParser parser = xContent.createParser(data.slice(from, nextMarker - from))) { // move pointers from = nextMarker + 1; @@ -285,43 +287,53 @@ public class BulkRequest extends ActionRequest implements Composite // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters + token = parser.nextToken(); - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token.isValue()) { - if ("_index".equals(currentFieldName)) { - if (!allowExplicitIndex) { - throw new IllegalArgumentException("explicit index in bulk is not allowed"); - } - index = parser.text(); - } else if ("_type".equals(currentFieldName)) { - type = parser.text(); - } else if ("_id".equals(currentFieldName)) { - id = parser.text(); - } else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) { - routing = parser.text(); - } else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) { - parent = parser.text(); - } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { - timestamp = parser.text(); - } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { - if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { - ttl = TimeValue.parseTimeValue(parser.text(), null).millis(); + if (token == XContentParser.Token.START_OBJECT) { + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("_index".equals(currentFieldName)) { + if (!allowExplicitIndex) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = parser.text(); + } else if ("_type".equals(currentFieldName)) { + type = parser.text(); + } else if ("_id".equals(currentFieldName)) { + id = parser.text(); + } else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) { + routing = parser.text(); + } else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) { + parent = parser.text(); + } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { + timestamp = parser.text(); + } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { + if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { + ttl = TimeValue.parseTimeValue(parser.text(), null).millis(); + } else { + ttl = parser.longValue(); + } + } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { + opType = parser.text(); + } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { + version = parser.longValue(); + } else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) { + versionType = VersionType.fromString(parser.text()); + } else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) { + retryOnConflict = parser.intValue(); } else { - ttl = parser.longValue(); + throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"); } - } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { - opType = parser.text(); - } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { - version = parser.longValue(); - } else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) { - versionType = VersionType.fromString(parser.text()); - } else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) { - retryOnConflict = parser.intValue(); + } else { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]"); } } + } else if (token != XContentParser.Token.END_OBJECT) { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " + XContentParser.Token.START_OBJECT + + " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]"); } if ("delete".equals(action)) { @@ -331,6 +343,8 @@ public class BulkRequest extends ActionRequest implements Composite if (nextMarker == -1) { break; } + line++; + // order is important, we set parent after routing, so routing will be set to parent if not set explicitly // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks // of index request. diff --git a/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 3e1e762b45c..040bb81ef6b 100644 --- a/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -117,4 +117,56 @@ public class BulkRequestTests extends ElasticsearchTestCase { assertThat(bulkRequest.requests().get(1), instanceOf(UpdateRequest.class)); assertThat(bulkRequest.requests().get(2), instanceOf(DeleteRequest.class)); } + + @Test + public void testSimpleBulk6() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk6.json"); + BulkRequest bulkRequest = new BulkRequest(); + try { + bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null); + fail("should have thrown an exception about the wrong format of line 1"); + } catch (IllegalArgumentException e) { + assertThat("message contains error about the wrong format of line 1: " + e.getMessage(), + e.getMessage().contains("Malformed action/metadata line [1], expected a simple value for field [_source] but found [START_OBJECT]"), equalTo(true)); + } + } + + @Test + public void testSimpleBulk7() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk7.json"); + BulkRequest bulkRequest = new BulkRequest(); + try { + bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null); + fail("should have thrown an exception about the wrong format of line 5"); + } catch (IllegalArgumentException e) { + assertThat("message contains error about the wrong format of line 5: " + e.getMessage(), + e.getMessage().contains("Malformed action/metadata line [5], expected a simple value for field [_unkown] but found [START_ARRAY]"), equalTo(true)); + } + } + + @Test + public void testSimpleBulk8() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk8.json"); + BulkRequest bulkRequest = new BulkRequest(); + try { + bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null); + fail("should have thrown an exception about the unknown paramater _foo"); + } catch (IllegalArgumentException e) { + assertThat("message contains error about the unknown paramater _foo: " + e.getMessage(), + e.getMessage().contains("Action/metadata line [3] contains an unknown parameter [_foo]"), equalTo(true)); + } + } + + @Test + public void testSimpleBulk9() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk9.json"); + BulkRequest bulkRequest = new BulkRequest(); + try { + bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null); + fail("should have thrown an exception about the wrong format of line 3"); + } catch (IllegalArgumentException e) { + assertThat("message contains error about the wrong format of line 3: " + e.getMessage(), + e.getMessage().contains("Malformed action/metadata line [3], expected START_OBJECT or END_OBJECT but found [START_ARRAY]"), equalTo(true)); + } + } } diff --git a/src/test/java/org/elasticsearch/action/bulk/simple-bulk6.json b/src/test/java/org/elasticsearch/action/bulk/simple-bulk6.json new file mode 100644 index 00000000000..e9c97965595 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/simple-bulk6.json @@ -0,0 +1,6 @@ +{"index": {"_index": "test", "_type": "doc", "_source": {"hello": "world"}, "_id": 0}} +{"field1": "value0"} +{"index": {"_index": "test", "_type": "doc", "_id": 1}} +{"field1": "value1"} +{"index": {"_index": "test", "_type": "doc", "_id": 2}} +{"field1": "value2"} diff --git a/src/test/java/org/elasticsearch/action/bulk/simple-bulk7.json b/src/test/java/org/elasticsearch/action/bulk/simple-bulk7.json new file mode 100644 index 00000000000..a642d9ce4fe --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/simple-bulk7.json @@ -0,0 +1,6 @@ +{"index": {"_index": "test", "_type": "doc", "_id": 0}} +{"field1": "value0"} +{"index": {"_index": "test", "_type": "doc", "_id": 1}} +{"field1": "value1"} +{"index": {"_index": "test", "_type": "doc", "_id": 2, "_unkown": ["foo", "bar"]}} +{"field1": "value2"} diff --git a/src/test/java/org/elasticsearch/action/bulk/simple-bulk8.json b/src/test/java/org/elasticsearch/action/bulk/simple-bulk8.json new file mode 100644 index 00000000000..c1a94b1d159 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/simple-bulk8.json @@ -0,0 +1,6 @@ +{"index": {"_index": "test", "_type": "doc", "_id": 0}} +{"field1": "value0"} +{"index": {"_index": "test", "_type": "doc", "_id": 1, "_foo": "bar"}} +{"field1": "value1"} +{"index": {"_index": "test", "_type": "doc", "_id": 2}} +{"field1": "value2"} diff --git a/src/test/java/org/elasticsearch/action/bulk/simple-bulk9.json b/src/test/java/org/elasticsearch/action/bulk/simple-bulk9.json new file mode 100644 index 00000000000..ebdbf750116 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/simple-bulk9.json @@ -0,0 +1,4 @@ +{"index": {}} +{"field1": "value0"} +{"index": ["bar"] } +{"field1": "value1"}