Fix: Use correct OpType on Failure in BulkItemResponse
When a bulk request fails on a Delete or Update request, the BulkItemResponse reports incorrect "index" operation in the response. This PR fixes this for the case of closed indices as reported in #9821 but also for other failures and adds tests for the two cases covered. Closes #9821
This commit is contained in:
parent
ff0f480938
commit
201dc87aae
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
|
@ -28,7 +27,6 @@ import org.elasticsearch.action.update.UpdateResponse;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -168,13 +168,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
} else if (request instanceof DeleteRequest) {
|
||||
DeleteRequest deleteRequest = (DeleteRequest) request;
|
||||
if (index.equals(deleteRequest.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
||||
responses.set(idx, new BulkItemResponse(idx, "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
||||
return true;
|
||||
}
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
UpdateRequest updateRequest = (UpdateRequest) request;
|
||||
if (index.equals(updateRequest.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e)));
|
||||
responses.set(idx, new BulkItemResponse(idx, "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e)));
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
|
@ -379,7 +379,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
if (unavailableException != null) {
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
|
||||
unavailableException);
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure);
|
||||
String operationType = "unknown";
|
||||
if (request instanceof IndexRequest) {
|
||||
operationType = "index";
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
operationType = "delete";
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
operationType = "update";
|
||||
}
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, operationType, failure);
|
||||
responses.set(idx, bulkItemResponse);
|
||||
// make sure the request gets never processed again
|
||||
bulkRequest.requests.set(idx, null);
|
||||
|
|
|
@ -725,5 +725,39 @@ public class BulkTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(bulkResponse.hasFailures(), is(true));
|
||||
assertThat(bulkResponse.getItems().length, is(5));
|
||||
}
|
||||
|
||||
@Test // issue 9821
|
||||
public void testFailedRequestsOnClosedIndex() throws Exception {
|
||||
createIndex("bulkindex1");
|
||||
ensureYellow();
|
||||
|
||||
client().prepareIndex("bulkindex1", "index1_type", "1").setSource("text", "test").get();
|
||||
assertAcked(client().admin().indices().prepareClose("bulkindex1"));
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1"))
|
||||
.add(new UpdateRequest("bulkindex1", "index1_type", "1").doc("foo", "bar"))
|
||||
.add(new DeleteRequest("bulkindex1", "index1_type", "1")).refresh(true);
|
||||
|
||||
BulkResponse bulkResponse = client().bulk(bulkRequest).get();
|
||||
assertThat(bulkResponse.hasFailures(), is(true));
|
||||
BulkItemResponse[] responseItems = bulkResponse.getItems();
|
||||
assertThat(responseItems.length, is(3));
|
||||
assertThat(responseItems[0].getOpType(), is("index"));
|
||||
assertThat(responseItems[1].getOpType(), is("update"));
|
||||
assertThat(responseItems[2].getOpType(), is("delete"));
|
||||
}
|
||||
|
||||
@Test // issue 9821
|
||||
public void testInvalidIndexNamesCorrectOpType() {
|
||||
BulkResponse bulkResponse = client().prepareBulk()
|
||||
.add(client().prepareIndex().setIndex("INVALID.NAME").setType("type1").setId("1").setSource("field", 1))
|
||||
.add(client().prepareUpdate().setIndex("INVALID.NAME").setType("type1").setId("1").setDoc("field", randomInt()))
|
||||
.add(client().prepareDelete().setIndex("INVALID.NAME").setType("type1").setId("1")).get();
|
||||
assertThat(bulkResponse.getItems().length, is(3));
|
||||
assertThat(bulkResponse.getItems()[0].getOpType(), is("index"));
|
||||
assertThat(bulkResponse.getItems()[1].getOpType(), is("update"));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), is("delete"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue