Allow abort of bulk items before processing (#26434)

Adds support for bulk items to be aborted before they are processed by the TransportShardBulkAction.
This can be used by an ActionFilter to reject a subset of the items in a bulk action without rejecting the whole action (or all the items for a shard).
This commit is contained in:
Tim Vernum 2017-08-31 21:23:14 +10:00 committed by GitHub
parent adad605081
commit eb87df9ff9
5 changed files with 112 additions and 11 deletions

View File

@ -186,7 +186,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
// Prepare for backport of #26434
ext.bwc_tests_enabled = false
}
task verifyBwcTestsEnabled {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
import java.util.Objects;
public class BulkItemRequest implements Streamable {
@ -63,6 +64,23 @@ public class BulkItemRequest implements Streamable {
this.primaryResponse = primaryResponse;
}
/**
* Abort this request, and store a {@link org.elasticsearch.action.bulk.BulkItemResponse.Failure} response.
*
* @param index The concrete index that was resolved for this request
* @param cause The cause of the rejection (may not be null)
* @throws IllegalStateException If a response already exists for this request
*/
public void abort(String index, Exception cause) {
if (primaryResponse != null) {
assert false : "Response already exists " + primaryResponse.status();
throw new IllegalStateException("Item already has a response (status=" + primaryResponse.status() + ")");
}
final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.type(), request.id(),
Objects.requireNonNull(cause), true);
setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure));
}
public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
BulkItemRequest item = new BulkItemRequest();
item.readFrom(in);

View File

@ -173,6 +173,7 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
private final Exception cause;
private final RestStatus status;
private final long seqNo;
private final boolean aborted;
/**
* For write failures before operation was assigned a sequence number.
@ -181,25 +182,30 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
* to record operation sequence no with failure
*/
public Failure(String index, String type, String id, Exception cause) {
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO);
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
}
public Failure(String index, String type, String id, Exception cause, boolean aborted) {
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO, aborted);
}
public Failure(String index, String type, String id, Exception cause, RestStatus status) {
this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO);
this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
}
/** For write failures after operation was assigned a sequence number. */
public Failure(String index, String type, String id, Exception cause, long seqNo) {
this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo);
this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo, false);
}
public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo) {
public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo, boolean aborted) {
this.index = index;
this.type = type;
this.id = id;
this.cause = cause;
this.status = status;
this.seqNo = seqNo;
this.aborted = aborted;
}
/**
@ -216,6 +222,11 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
if (supportsAbortedFlag(in.getVersion())) {
aborted = in.readBoolean();
} else {
aborted = false;
}
}
@Override
@ -227,8 +238,15 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(getSeqNo());
}
if (supportsAbortedFlag(out.getVersion())) {
out.writeBoolean(aborted);
}
}
private static boolean supportsAbortedFlag(Version version) {
// The "aborted" flag was added for 5.5.3 and 5.6.0, but was not in 6.0.0-beta2
return version.after(Version.V_6_0_0_beta2) || (version.major == 5 && version.onOrAfter(Version.V_5_5_3));
}
/**
* The index name of the action.
@ -281,6 +299,15 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
return seqNo;
}
/**
* Whether this failure is the result of an <em>abort</em>.
* If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.
* @see BulkItemRequest#abort(String, Exception)
*/
public boolean isAborted() {
return aborted;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_FIELD, index);

View File

@ -54,7 +54,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
@ -120,8 +119,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
Translog.Location location = null;
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) {
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
updateHelper, nowInMillisSupplier, mappingUpdater);
}
}
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
@ -260,6 +261,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return calculateTranslogLocation(location, responseHolder);
}
private static boolean isAborted(BulkItemResponse response) {
return response != null && response.isFailed() && response.getFailure().isAborted();
}
private static boolean isConflictException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -30,14 +31,12 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.TransportWriteAction.WritePrimaryResult;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
@ -59,9 +58,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
import static org.junit.Assert.assertNotNull;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.any;
@ -207,6 +207,56 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
closeShards(shard);
}
public void testSkipBulkIndexRequestIfAborted() throws Exception {
IndexShard shard = newStartedShard(true);
BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(2, 5)];
for (int i = 0; i < items.length; i++) {
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id_" + i)
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar-" + i)
.opType(DocWriteRequest.OpType.INDEX);
items[i] = new BulkItemRequest(i, writeRequest);
}
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
// Preemptively abort one of the bulk items, but allow the others to proceed
BulkItemRequest rejectItem = randomFrom(items);
RestStatus rejectionStatus = randomFrom(RestStatus.BAD_REQUEST, RestStatus.CONFLICT, RestStatus.FORBIDDEN, RestStatus.LOCKED);
final ElasticsearchStatusException rejectionCause = new ElasticsearchStatusException("testing rejection", rejectionStatus);
rejectItem.abort("index", rejectionCause);
UpdateHelper updateHelper = null;
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result = TransportShardBulkAction.performOnPrimary(
bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
// since at least 1 item passed, the tran log location should exist,
assertThat(result.location, notNullValue());
// and the response should exist and match the item count
assertThat(result.finalResponseIfSuccessful, notNullValue());
assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length));
// check each response matches the input item, including the rejection
for (int i = 0; i < items.length; i++) {
BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i];
assertThat(response.getItemId(), equalTo(i));
assertThat(response.getIndex(), equalTo("index"));
assertThat(response.getType(), equalTo("type"));
assertThat(response.getId(), equalTo("id_" + i));
assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
if (response.getItemId() == rejectItem.id()) {
assertTrue(response.isFailed());
assertThat(response.getFailure().getCause(), equalTo(rejectionCause));
assertThat(response.status(), equalTo(rejectionStatus));
} else {
assertFalse(response.isFailed());
}
}
// Check that the non-rejected updates made it to the shard
assertDocCount(shard, items.length - 1);
closeShards(shard);
}
public void testExecuteBulkIndexRequestWithRejection() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);