make sure to close the bulk request parser when done parsing the header for each item

This commit is contained in:
Shay Banon 2012-02-20 22:13:09 +02:00
parent 790e727843
commit 1b082b00ba
1 changed files with 95 additions and 91 deletions

View File

@ -99,99 +99,103 @@ public class BulkRequest implements ActionRequest {
// now parse the action
XContentParser parser = xContent.createParser(data, from, nextMarker - from);
// move pointers
from = nextMarker + 1;
// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
continue;
}
assert token == XContentParser.Token.START_OBJECT;
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
assert token == XContentParser.Token.FIELD_NAME;
String action = parser.currentName();
String index = defaultIndex;
String type = defaultType;
String id = null;
String routing = null;
String parent = null;
String timestamp = null;
Long ttl = null;
String opType = null;
long version = 0;
VersionType versionType = VersionType.INTERNAL;
String percolate = null;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("_index".equals(currentFieldName)) {
index = parser.text();
} else if ("_type".equals(currentFieldName)) {
type = parser.text();
} else if ("_id".equals(currentFieldName)) {
id = parser.text();
} else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) {
parent = parser.text();
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
ttl = TimeValue.parseTimeValue(parser.text(), null).millis();
} else {
ttl = parser.longValue();
}
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) {
versionType = VersionType.fromString(parser.text());
} else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) {
percolate = parser.textOrNull();
}
}
}
if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing));
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
// order is important, we set parent after routing, so routing will be set to parent if not set explicitly
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request. All index requests are still unsafe if applicable.
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create(true)
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
}
try {
// move pointers
from = nextMarker + 1;
// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
continue;
}
assert token == XContentParser.Token.START_OBJECT;
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
assert token == XContentParser.Token.FIELD_NAME;
String action = parser.currentName();
String index = defaultIndex;
String type = defaultType;
String id = null;
String routing = null;
String parent = null;
String timestamp = null;
Long ttl = null;
String opType = null;
long version = 0;
VersionType versionType = VersionType.INTERNAL;
String percolate = null;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("_index".equals(currentFieldName)) {
index = parser.text();
} else if ("_type".equals(currentFieldName)) {
type = parser.text();
} else if ("_id".equals(currentFieldName)) {
id = parser.text();
} else if ("_routing".equals(currentFieldName) || "routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) {
parent = parser.text();
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
ttl = TimeValue.parseTimeValue(parser.text(), null).millis();
} else {
ttl = parser.longValue();
}
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) {
versionType = VersionType.fromString(parser.text());
} else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) {
percolate = parser.textOrNull();
}
}
}
if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing));
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
// order is important, we set parent after routing, so routing will be set to parent if not set explicitly
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request. All index requests are still unsafe if applicable.
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create(true)
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
}
// move pointers
from = nextMarker + 1;
}
} finally {
parser.close();
}
}
return this;