Bulk: throw exception if unrecognized parameter in action/metadata line

Closes #10977
This commit is contained in:
Tanguy Leroux 2015-05-25 15:06:56 +02:00
parent 9d5e789508
commit acb07c72b9
6 changed files with 121 additions and 33 deletions

View File

@ -246,6 +246,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
XContent xContent = XContentFactory.xContent(data);
int line = 0;
int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator();
@ -254,8 +255,9 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
if (nextMarker == -1) {
break;
}
// now parse the action
line++;
// now parse the action
try (XContentParser parser = xContent.createParser(data.slice(from, nextMarker - from))) {
// move pointers
from = nextMarker + 1;
@ -285,43 +287,53 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("_index".equals(currentFieldName)) {
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = parser.text();
} else if ("_type".equals(currentFieldName)) {
type = parser.text();
} else if ("_id".equals(currentFieldName)) {
id = parser.text();
} else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) {
parent = parser.text();
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
ttl = TimeValue.parseTimeValue(parser.text(), null).millis();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("_index".equals(currentFieldName)) {
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = parser.text();
} else if ("_type".equals(currentFieldName)) {
type = parser.text();
} else if ("_id".equals(currentFieldName)) {
id = parser.text();
} else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) {
parent = parser.text();
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
ttl = TimeValue.parseTimeValue(parser.text(), null).millis();
} else {
ttl = parser.longValue();
}
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) {
versionType = VersionType.fromString(parser.text());
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
} else {
ttl = parser.longValue();
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]");
}
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) {
versionType = VersionType.fromString(parser.text());
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
} else {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " + XContentParser.Token.START_OBJECT
+ " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]");
}
if ("delete".equals(action)) {
@ -331,6 +343,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
if (nextMarker == -1) {
break;
}
line++;
// order is important, we set parent after routing, so routing will be set to parent if not set explicitly
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.

View File

@ -117,4 +117,56 @@ public class BulkRequestTests extends ElasticsearchTestCase {
assertThat(bulkRequest.requests().get(1), instanceOf(UpdateRequest.class));
assertThat(bulkRequest.requests().get(2), instanceOf(DeleteRequest.class));
}
@Test
public void testSimpleBulk6() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk6.json");
BulkRequest bulkRequest = new BulkRequest();
try {
bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null);
fail("should have thrown an exception about the wrong format of line 1");
} catch (IllegalArgumentException e) {
assertThat("message contains error about the wrong format of line 1: " + e.getMessage(),
e.getMessage().contains("Malformed action/metadata line [1], expected a simple value for field [_source] but found [START_OBJECT]"), equalTo(true));
}
}
@Test
public void testSimpleBulk7() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk7.json");
BulkRequest bulkRequest = new BulkRequest();
try {
bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null);
fail("should have thrown an exception about the wrong format of line 5");
} catch (IllegalArgumentException e) {
assertThat("message contains error about the wrong format of line 5: " + e.getMessage(),
e.getMessage().contains("Malformed action/metadata line [5], expected a simple value for field [_unkown] but found [START_ARRAY]"), equalTo(true));
}
}
@Test
public void testSimpleBulk8() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk8.json");
BulkRequest bulkRequest = new BulkRequest();
try {
bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null);
fail("should have thrown an exception about the unknown paramater _foo");
} catch (IllegalArgumentException e) {
assertThat("message contains error about the unknown paramater _foo: " + e.getMessage(),
e.getMessage().contains("Action/metadata line [3] contains an unknown parameter [_foo]"), equalTo(true));
}
}
@Test
public void testSimpleBulk9() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk9.json");
BulkRequest bulkRequest = new BulkRequest();
try {
bulkRequest.add(bulkAction.getBytes(Charsets.UTF_8), 0, bulkAction.length(), null, null);
fail("should have thrown an exception about the wrong format of line 3");
} catch (IllegalArgumentException e) {
assertThat("message contains error about the wrong format of line 3: " + e.getMessage(),
e.getMessage().contains("Malformed action/metadata line [3], expected START_OBJECT or END_OBJECT but found [START_ARRAY]"), equalTo(true));
}
}
}

View File

@ -0,0 +1,6 @@
{"index": {"_index": "test", "_type": "doc", "_source": {"hello": "world"}, "_id": 0}}
{"field1": "value0"}
{"index": {"_index": "test", "_type": "doc", "_id": 1}}
{"field1": "value1"}
{"index": {"_index": "test", "_type": "doc", "_id": 2}}
{"field1": "value2"}

View File

@ -0,0 +1,6 @@
{"index": {"_index": "test", "_type": "doc", "_id": 0}}
{"field1": "value0"}
{"index": {"_index": "test", "_type": "doc", "_id": 1}}
{"field1": "value1"}
{"index": {"_index": "test", "_type": "doc", "_id": 2, "_unkown": ["foo", "bar"]}}
{"field1": "value2"}

View File

@ -0,0 +1,6 @@
{"index": {"_index": "test", "_type": "doc", "_id": 0}}
{"field1": "value0"}
{"index": {"_index": "test", "_type": "doc", "_id": 1, "_foo": "bar"}}
{"field1": "value1"}
{"index": {"_index": "test", "_type": "doc", "_id": 2}}
{"field1": "value2"}

View File

@ -0,0 +1,4 @@
{"index": {}}
{"field1": "value0"}
{"index": ["bar"] }
{"field1": "value1"}