Bulk indexing issue - missing parent routing causes NullPointerException.

Now each error is reported in bulk response rather than causing entire bulk to fail.
Added a Junit test but the use of TransportClient means the error is manifested differently to a REST based request - instead of a NullPointer the whole of the bulk request failed with a RoutingMissingException. Changed TransportBulkAction to catch this exception and treat it the same as the existing logic for a ElasticsearchParseException - the individual bulk request items are flagged and reported individually rather than failing the whole bulk request.

Closes #8365
This commit is contained in:
markharwood 2014-11-17 17:14:05 +00:00
parent d1c6d3b7b0
commit 6f79d67f81
2 changed files with 51 additions and 7 deletions

View File

@ -19,17 +19,17 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.elasticsearch.action.RoutingMissingException;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@ -229,7 +229,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
} }
try { try {
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex); indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
} catch (ElasticsearchParseException e) { } catch (ElasticsearchParseException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure); BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
responses.set(i, bulkItemResponse); responses.set(i, bulkItemResponse);
@ -287,8 +287,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index()); String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index());
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type()); MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type());
if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) { if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) {
//Bulk update child doc, NPE error message when parent is not specified #8365 BulkItemResponse.Failure failure = new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(),
throw new RoutingMissingException(concreteIndex, updateRequest.type(), updateRequest.id()); updateRequest.id(), "routing is required for this item", RestStatus.BAD_REQUEST);
responses.set(i, new BulkItemResponse(i, updateRequest.type(), failure));
continue;
} }
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId(); ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId); List<BulkItemRequest> list = requestsByShard.get(shardId);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.document; package org.elasticsearch.document;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
@ -47,8 +48,15 @@ import java.util.ArrayList;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class BulkTests extends ElasticsearchIntegrationTest { public class BulkTests extends ElasticsearchIntegrationTest {
@ -474,6 +482,40 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertSearchHits(searchResponse, "child1"); assertSearchHits(searchResponse, "child1");
} }
/*
* Test for https://github.com/elasticsearch/elasticsearch/issues/8365
*/
@Test
public void testBulkUpdateChildMissingParentRouting() throws Exception {
assertAcked(prepareCreate("test").addMapping("parent", "{\"parent\":{}}").addMapping("child",
"{\"child\": {\"_parent\": {\"type\": \"parent\"}}}"));
ensureGreen();
BulkRequestBuilder builder = client().prepareBulk();
byte[] addParent = new BytesArray("{\"index\" : { \"_index\" : \"test\", \"_type\" : \"parent\", \"_id\" : \"parent1\"}}\n"
+ "{\"field1\" : \"value1\"}\n").array();
byte[] addChildOK = new BytesArray(
"{\"index\" : { \"_id\" : \"child1\", \"_type\" : \"child\", \"_index\" : \"test\", \"parent\" : \"parent1\"} }\n"
+ "{ \"field1\" : \"value1\"}\n").array();
byte[] addChildMissingRouting = new BytesArray(
"{\"index\" : { \"_id\" : \"child2\", \"_type\" : \"child\", \"_index\" : \"test\"} }\n" + "{ \"field1\" : \"value1\"}\n")
.array();
builder.add(addParent, 0, addParent.length, false);
builder.add(addChildOK, 0, addChildOK.length, false);
builder.add(addChildMissingRouting, 0, addChildMissingRouting.length, false);
builder.add(addChildOK, 0, addChildOK.length, false);
BulkResponse bulkResponse = builder.get();
assertThat(bulkResponse.getItems().length, equalTo(4));
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[2].isFailed(), equalTo(true));
assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false));
}
@Test @Test
public void testFailingVersionedUpdatedOnBulk() throws Exception { public void testFailingVersionedUpdatedOnBulk() throws Exception {
createIndex("test"); createIndex("test");