Merge pull request #12060 from cbuescher/fix/9821

Fix: Use correct OpType on Failure in BulkItemResponse
This commit is contained in:
Christoph Büscher 2015-07-07 09:29:29 +02:00
commit 35ddc749b1
3 changed files with 45 additions and 5 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
@ -28,7 +27,6 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;

View File

@ -168,13 +168,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
} else if (request instanceof DeleteRequest) { } else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request; DeleteRequest deleteRequest = (DeleteRequest) request;
if (index.equals(deleteRequest.index())) { if (index.equals(deleteRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e))); responses.set(idx, new BulkItemResponse(idx, "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e)));
return true; return true;
} }
} else if (request instanceof UpdateRequest) { } else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request; UpdateRequest updateRequest = (UpdateRequest) request;
if (index.equals(updateRequest.index())) { if (index.equals(updateRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e))); responses.set(idx, new BulkItemResponse(idx, "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e)));
return true; return true;
} }
} else { } else {
@ -379,7 +379,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (unavailableException != null) { if (unavailableException != null) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
unavailableException); unavailableException);
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure); String operationType = "unknown";
if (request instanceof IndexRequest) {
operationType = "index";
} else if (request instanceof DeleteRequest) {
operationType = "delete";
} else if (request instanceof UpdateRequest) {
operationType = "update";
}
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, operationType, failure);
responses.set(idx, bulkItemResponse); responses.set(idx, bulkItemResponse);
// make sure the request gets never processed again // make sure the request gets never processed again
bulkRequest.requests.set(idx, null); bulkRequest.requests.set(idx, null);

View File

@ -725,5 +725,39 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertThat(bulkResponse.hasFailures(), is(true)); assertThat(bulkResponse.hasFailures(), is(true));
assertThat(bulkResponse.getItems().length, is(5)); assertThat(bulkResponse.getItems().length, is(5));
} }
@Test // issue 9821
public void testFailedRequestsOnClosedIndex() throws Exception {
createIndex("bulkindex1");
ensureYellow();
client().prepareIndex("bulkindex1", "index1_type", "1").setSource("text", "test").get();
assertAcked(client().admin().indices().prepareClose("bulkindex1"));
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1"))
.add(new UpdateRequest("bulkindex1", "index1_type", "1").doc("foo", "bar"))
.add(new DeleteRequest("bulkindex1", "index1_type", "1")).refresh(true);
BulkResponse bulkResponse = client().bulk(bulkRequest).get();
assertThat(bulkResponse.hasFailures(), is(true));
BulkItemResponse[] responseItems = bulkResponse.getItems();
assertThat(responseItems.length, is(3));
assertThat(responseItems[0].getOpType(), is("index"));
assertThat(responseItems[1].getOpType(), is("update"));
assertThat(responseItems[2].getOpType(), is("delete"));
}
@Test // issue 9821
public void testInvalidIndexNamesCorrectOpType() {
BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex().setIndex("INVALID.NAME").setType("type1").setId("1").setSource("field", 1))
.add(client().prepareUpdate().setIndex("INVALID.NAME").setType("type1").setId("1").setDoc("field", randomInt()))
.add(client().prepareDelete().setIndex("INVALID.NAME").setType("type1").setId("1")).get();
assertThat(bulkResponse.getItems().length, is(3));
assertThat(bulkResponse.getItems()[0].getOpType(), is("index"));
assertThat(bulkResponse.getItems()[1].getOpType(), is("update"));
assertThat(bulkResponse.getItems()[2].getOpType(), is("delete"));
}
} }