Add parsing method to bulk response (#23234)

This commit adds the `fromXContent()` parsing method to BulkResponse.
This commit is contained in:
Tanguy Leroux 2017-02-21 10:49:40 +01:00 committed by GitHub
parent c88eb00b83
commit 39ed76c58b
3 changed files with 185 additions and 34 deletions

View File

@ -23,17 +23,32 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownToken;
/** /**
* A response of a bulk execution. Holding a response for each item responding (in order) of the * A response of a bulk execution. Holding a response for each item responding (in order) of the
* bulk requests. Each item holds the index/type/id is operated on, and if it failed or not (with the * bulk requests. Each item holds the index/type/id is operated on, and if it failed or not (with the
* failure message). * failure message).
*/ */
public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse> { public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse>, StatusToXContentObject {
private static final String ITEMS = "items";
private static final String ERRORS = "errors";
private static final String TOOK = "took";
private static final String INGEST_TOOK = "ingest_took";
public static final long NO_INGEST_TOOK = -1L; public static final long NO_INGEST_TOOK = -1L;
@ -141,4 +156,61 @@ public class BulkResponse extends ActionResponse implements Iterable<BulkItemRes
out.writeVLong(tookInMillis); out.writeVLong(tookInMillis);
out.writeZLong(ingestTookInMillis); out.writeZLong(ingestTookInMillis);
} }
@Override
public RestStatus status() {
return RestStatus.OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(TOOK, tookInMillis);
if (ingestTookInMillis != BulkResponse.NO_INGEST_TOOK) {
builder.field(INGEST_TOOK, ingestTookInMillis);
}
builder.field(ERRORS, hasFailures());
builder.startArray(ITEMS);
for (BulkItemResponse item : this) {
item.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
public static BulkResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
long took = -1L;
long ingestTook = NO_INGEST_TOOK;
List<BulkItemResponse> items = new ArrayList<>();
String currentFieldName = parser.currentName();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (TOOK.equals(currentFieldName)) {
took = parser.longValue();
} else if (INGEST_TOOK.equals(currentFieldName)) {
ingestTook = parser.longValue();
} else if (ERRORS.equals(currentFieldName) == false) {
throwUnknownField(currentFieldName, parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (ITEMS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
items.add(BulkItemResponse.fromXContent(parser, items.size()));
}
} else {
throwUnknownField(currentFieldName, parser.getTokenLocation());
}
} else {
throwUnknownToken(token, parser.getTokenLocation());
}
}
return new BulkResponse(items.toArray(new BulkItemResponse[items.size()]), took, ingestTook);
}
} }

View File

@ -19,9 +19,7 @@
package org.elasticsearch.rest.action.document; package org.elasticsearch.rest.action.document;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -30,20 +28,16 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT; import static org.elasticsearch.rest.RestRequest.Method.PUT;
import static org.elasticsearch.rest.RestStatus.OK;
/** /**
* <pre> * <pre>
@ -95,36 +89,11 @@ public class RestBulkAction extends BaseRestHandler {
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields, bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields,
defaultFetchSourceContext, defaultPipeline, null, allowExplicitIndex, request.getXContentType()); defaultFetchSourceContext, defaultPipeline, null, allowExplicitIndex, request.getXContentType());
return channel -> client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) { return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
@Override
public RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field(Fields.TOOK, response.getTookInMillis());
if (response.getIngestTookInMillis() != BulkResponse.NO_INGEST_TOOK) {
builder.field(Fields.INGEST_TOOK, response.getIngestTookInMillis());
}
builder.field(Fields.ERRORS, response.hasFailures());
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
itemResponse.toXContent(builder, request);
}
builder.endArray();
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
} }
@Override @Override
public boolean supportsContentStream() { public boolean supportsContentStream() {
return true; return true;
} }
static final class Fields {
static final String ITEMS = "items";
static final String ERRORS = "errors";
static final String TOOK = "took";
static final String INGEST_TOOK = "ingest_took";
}
} }

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponseTests;
import org.elasticsearch.action.index.IndexResponseTests;
import org.elasticsearch.action.update.UpdateResponseTests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.ElasticsearchExceptionTests.randomExceptions;
import static org.elasticsearch.action.bulk.BulkItemResponseTests.assertBulkItemResponse;
import static org.elasticsearch.action.bulk.BulkResponse.NO_INGEST_TOOK;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
public class BulkResponseTests extends ESTestCase {
public void testToAndFromXContent() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
boolean humanReadable = randomBoolean();
long took = randomFrom(randomNonNegativeLong(), -1L);
long ingestTook = randomFrom(randomNonNegativeLong(), NO_INGEST_TOOK);
int nbBulkItems = randomIntBetween(1, 10);
BulkItemResponse[] bulkItems = new BulkItemResponse[nbBulkItems];
BulkItemResponse[] expectedBulkItems = new BulkItemResponse[nbBulkItems];
for (int i = 0; i < nbBulkItems; i++) {
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
if (frequently()) {
Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> randomDocWriteResponses = null;
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
randomDocWriteResponses = IndexResponseTests.randomIndexResponse();
} else if (opType == DocWriteRequest.OpType.DELETE) {
randomDocWriteResponses = DeleteResponseTests.randomDeleteResponse();
} else if (opType == DocWriteRequest.OpType.UPDATE) {
randomDocWriteResponses = UpdateResponseTests.randomUpdateResponse(xContentType);
} else {
fail("Test does not support opType [" + opType + "]");
}
bulkItems[i] = new BulkItemResponse(i, opType, randomDocWriteResponses.v1());
expectedBulkItems[i] = new BulkItemResponse(i, opType, randomDocWriteResponses.v2());
} else {
String index = randomAsciiOfLength(5);
String type = randomAsciiOfLength(5);
String id = randomAsciiOfLength(5);
Tuple<Throwable, ElasticsearchException> failures = randomExceptions();
bulkItems[i] = new BulkItemResponse(i, opType, new BulkItemResponse.Failure(index, type, id, (Exception) failures.v1()));
expectedBulkItems[i] = new BulkItemResponse(i, opType, new BulkItemResponse.Failure(index, type, id, failures.v2()));
}
}
BulkResponse bulkResponse = new BulkResponse(bulkItems, took, ingestTook);
BytesReference originalBytes = toXContent(bulkResponse, xContentType, humanReadable);
if (randomBoolean()) {
try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) {
originalBytes = shuffleXContent(parser, randomBoolean()).bytes();
}
}
BulkResponse parsedBulkResponse;
try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) {
parsedBulkResponse = BulkResponse.fromXContent(parser);
assertNull(parser.nextToken());
}
assertEquals(took, parsedBulkResponse.getTookInMillis());
assertEquals(ingestTook, parsedBulkResponse.getIngestTookInMillis());
assertEquals(expectedBulkItems.length, parsedBulkResponse.getItems().length);
for (int i = 0; i < expectedBulkItems.length; i++) {
assertBulkItemResponse(expectedBulkItems[i], parsedBulkResponse.getItems()[i]);
}
BytesReference finalBytes = toXContent(parsedBulkResponse, xContentType, humanReadable);
BytesReference expectedFinalBytes = toXContent(parsedBulkResponse, xContentType, humanReadable);
assertToXContentEquivalent(expectedFinalBytes, finalBytes, xContentType);
}
}