diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 3c7472ea8ab..5f2519424a6 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -99,99 +99,103 @@ public class BulkRequest implements ActionRequest { // now parse the action XContentParser parser = xContent.createParser(data, from, nextMarker - from); - // move pointers - from = nextMarker + 1; - - // Move to START_OBJECT - XContentParser.Token token = parser.nextToken(); - if (token == null) { - continue; - } - assert token == XContentParser.Token.START_OBJECT; - // Move to FIELD_NAME, that's the action - token = parser.nextToken(); - assert token == XContentParser.Token.FIELD_NAME; - String action = parser.currentName(); - - String index = defaultIndex; - String type = defaultType; - String id = null; - String routing = null; - String parent = null; - String timestamp = null; - Long ttl = null; - String opType = null; - long version = 0; - VersionType versionType = VersionType.INTERNAL; - String percolate = null; - - // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) - // or START_OBJECT which will have another set of parameters - - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token.isValue()) { - if ("_index".equals(currentFieldName)) { - index = parser.text(); - } else if ("_type".equals(currentFieldName)) { - type = parser.text(); - } else if ("_id".equals(currentFieldName)) { - id = parser.text(); - } else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) { - routing = parser.text(); - } else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) { - parent = parser.text(); - } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { - timestamp = parser.text(); - } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { - if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { - ttl = TimeValue.parseTimeValue(parser.text(), null).millis(); - } else { - ttl = parser.longValue(); - } - } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { - opType = parser.text(); - } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { - version = parser.longValue(); - } else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) { - versionType = VersionType.fromString(parser.text()); - } else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) { - percolate = parser.textOrNull(); - } - } - } - - if ("delete".equals(action)) { - add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing)); - } else { - nextMarker = findNextMarker(marker, from, data, length); - if (nextMarker == -1) { - break; - } - // order is important, we set parent after routing, so routing will be set to parent if not set explicitly - // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks - // of index request. All index requests are still unsafe if applicable. - if ("index".equals(action)) { - if (opType == null) { - internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) - .source(data, from, nextMarker - from, contentUnsafe) - .percolate(percolate)); - } else { - internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) - .create("create".equals(opType)) - .source(data, from, nextMarker - from, contentUnsafe) - .percolate(percolate)); - } - } else if ("create".equals(action)) { - internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) - .create(true) - .source(data, from, nextMarker - from, contentUnsafe) - .percolate(percolate)); - } + try { // move pointers from = nextMarker + 1; + + // Move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token == null) { + continue; + } + assert token == XContentParser.Token.START_OBJECT; + // Move to FIELD_NAME, that's the action + token = parser.nextToken(); + assert token == XContentParser.Token.FIELD_NAME; + String action = parser.currentName(); + + String index = defaultIndex; + String type = defaultType; + String id = null; + String routing = null; + String parent = null; + String timestamp = null; + Long ttl = null; + String opType = null; + long version = 0; + VersionType versionType = VersionType.INTERNAL; + String percolate = null; + + // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) + // or START_OBJECT which will have another set of parameters + + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("_index".equals(currentFieldName)) { + index = parser.text(); + } else if ("_type".equals(currentFieldName)) { + type = parser.text(); + } else if ("_id".equals(currentFieldName)) { + id = parser.text(); + } else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) { + routing = parser.text(); + } else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) { + parent = parser.text(); + } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { + timestamp = parser.text(); + } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { + if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { + ttl = TimeValue.parseTimeValue(parser.text(), null).millis(); + } else { + ttl = parser.longValue(); + } + } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { + opType = parser.text(); + } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { + version = parser.longValue(); + } else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) { + versionType = VersionType.fromString(parser.text()); + } else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) { + percolate = parser.textOrNull(); + } + } + } + + if ("delete".equals(action)) { + add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing)); + } else { + nextMarker = findNextMarker(marker, from, data, length); + if (nextMarker == -1) { + break; + } + // order is important, we set parent after routing, so routing will be set to parent if not set explicitly + // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks + // of index request. All index requests are still unsafe if applicable. + if ("index".equals(action)) { + if (opType == null) { + internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) + .source(data, from, nextMarker - from, contentUnsafe) + .percolate(percolate)); + } else { + internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) + .create("create".equals(opType)) + .source(data, from, nextMarker - from, contentUnsafe) + .percolate(percolate)); + } + } else if ("create".equals(action)) { + internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) + .create(true) + .source(data, from, nextMarker - from, contentUnsafe) + .percolate(percolate)); + } + // move pointers + from = nextMarker + 1; + } + } finally { + parser.close(); } } return this;