Bulk: Failed preparsing does not fail whole bulk request

If a preparsing of the source is needed (due to mapping configuration,
which extracts the routing/id value from the source) and the source is not
valid JSON, then the whole bulk request is failed instead of a single
BulkRequest.

This commit ensures, that a broken JSON request is not forwarded to the
destination shard and creates an appropriate BulkItemResponse, which
includes a failure.

This also implied changing the BulkItemResponse serialization, because one
cannot be sure anymore, if a response includes an ID, in case it was not
specified and could not be extracted from the JSON.

Closes #4745
This commit is contained in:
Alexander Reelsen 2014-01-27 11:07:59 +01:00
parent 09575eb95f
commit 35e5432354
5 changed files with 108 additions and 11 deletions

View File

@ -264,7 +264,7 @@ public class BulkItemResponse implements Streamable {
if (in.readBoolean()) {
String fIndex = in.readSharedString();
String fType = in.readSharedString();
String fId = in.readString();
String fId = in.readOptionalString();
String fMessage = in.readString();
RestStatus status = RestStatus.readFrom(in);
failure = new Failure(fIndex, fType, fId, fMessage, status);
@ -294,7 +294,7 @@ public class BulkItemResponse implements Streamable {
out.writeBoolean(true);
out.writeSharedString(failure.getIndex());
out.writeSharedString(failure.getType());
out.writeString(failure.getId());
out.writeOptionalString(failure.getId());
out.writeString(failure.getMessage());
RestStatus.writeTo(out, failure.getStatus());
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -51,10 +52,7 @@ import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -153,7 +151,10 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
MetaData metaData = clusterState.metaData();
for (ActionRequest request : bulkRequest.requests) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());
for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
String aliasOrIndex = indexRequest.index();
@ -163,7 +164,15 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
if (metaData.hasIndex(indexRequest.index())) {
mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type());
}
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
try {
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
} catch (ElasticsearchParseException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
@ -174,8 +183,6 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
updateRequest.index(clusterState.metaData().concreteIndex(updateRequest.index()));
}
}
final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());
// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();

View File

@ -566,7 +566,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter());
}
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse doc to extract routing/timestamp", e);
throw new ElasticsearchParseException("failed to parse doc to extract routing/timestamp/id", e);
} finally {
if (parser != null) {
parser.close();

View File

@ -19,6 +19,8 @@
package org.elasticsearch.document;
import com.google.common.base.Charsets;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
@ -30,6 +32,7 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -507,4 +510,84 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertThat(successes, equalTo(1));
}
@Test // issue 4745
public void preParsingSourceDueToMappingShouldNotBreakCompleteBulkRequest() throws Exception {
XContentBuilder builder = jsonBuilder().startObject()
.startObject("type")
.startObject("_timestamp")
.field("enabled", true)
.field("path", "last_modified")
.endObject()
.endObject()
.endObject();
CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get();
assertAcked(createIndexResponse);
String brokenBuildRequestData = "{\"index\": {\"_id\": \"1\"}}\n" +
"{\"name\": \"Malformed}\n" +
"{\"index\": {\"_id\": \"2\"}}\n" +
"{\"name\": \"Good\", \"last_modified\" : \"2013-04-05\"}\n";
BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get();
assertThat(bulkResponse.getItems().length, is(2));
assertThat(bulkResponse.getItems()[0].isFailed(), is(true));
assertThat(bulkResponse.getItems()[1].isFailed(), is(false));
assertExists(get("test", "type", "2"));
}
@Test // issue 4745
public void preParsingSourceDueToRoutingShouldNotBreakCompleteBulkRequest() throws Exception {
XContentBuilder builder = jsonBuilder().startObject()
.startObject("type")
.startObject("_routing")
.field("required", true)
.field("path", "my_routing")
.endObject()
.endObject()
.endObject();
CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get();
assertAcked(createIndexResponse);
ensureYellow("test");
String brokenBuildRequestData = "{\"index\": {} }\n" +
"{\"name\": \"Malformed}\n" +
"{\"index\": { \"_id\" : \"24000\" } }\n" +
"{\"name\": \"Good\", \"my_routing\" : \"48000\"}\n";
BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get();
assertThat(bulkResponse.getItems().length, is(2));
assertThat(bulkResponse.getItems()[0].isFailed(), is(true));
assertThat(bulkResponse.getItems()[1].isFailed(), is(false));
assertExists(client().prepareGet("test", "type", "24000").setRouting("48000").get());
}
@Test // issue 4745
public void preParsingSourceDueToIdShouldNotBreakCompleteBulkRequest() throws Exception {
XContentBuilder builder = jsonBuilder().startObject()
.startObject("type")
.startObject("_id")
.field("path", "my_id")
.endObject()
.endObject()
.endObject();
CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get();
assertAcked(createIndexResponse);
ensureYellow("test");
String brokenBuildRequestData = "{\"index\": {} }\n" +
"{\"name\": \"Malformed}\n" +
"{\"index\": {} }\n" +
"{\"name\": \"Good\", \"my_id\" : \"48\"}\n";
BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get();
assertThat(bulkResponse.getItems().length, is(2));
assertThat(bulkResponse.getItems()[0].isFailed(), is(true));
assertThat(bulkResponse.getItems()[1].isFailed(), is(false));
assertExists(get("test", "type", "48"));
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@ -52,6 +53,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
@ -149,6 +151,11 @@ public class ElasticsearchAssertions {
assertVersionSerializable(percolateResponse);
}
public static void assertExists(GetResponse response) {
String message = String.format(Locale.ROOT, "Expected %s/%s/%s to exist, but does not", response.getIndex(), response.getType(), response.getId());
assertThat(message, response.isExists(), is(true));
}
public static void assertFirstHit(SearchResponse searchResponse, Matcher<SearchHit> matcher) {
assertSearchHit(searchResponse, 1, matcher);
}