Bulk operation can create duplicates on primary relocation

When executing a bulk request, with create index operation and auto generate id, if while the primary is relocating the bulk is executed, and the relocation is done while N items from the bulk have executed, the full shard bulk request will be retried on the new primary. This can create duplicates because the request is not makred as potentially holding conflicts.

This change carries over the response for each item on the request level, and if a conflict is detected on the primary shard, and the response is there (indicating that the request was executed once already), use the mentioned response as the actual response for that bulk shard item.

On top of that, when a primary fails and is retried, the change now marks the request as potentially causing duplicates, so the actual impl will do the extra lookup needed.

This change also fixes a bug in our exception handling on the replica, where if a specific item failed, and its not an exception we can ignore, we should actually cause the shard to fail.

closes #7729
This commit is contained in:
Shay Banon 2014-09-15 18:27:30 +02:00
parent 12cbb3223a
commit 99f91f7616
5 changed files with 120 additions and 39 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.delete.DeleteRequest;
@ -36,8 +37,9 @@ import java.io.IOException;
public class BulkItemRequest implements Streamable {
private int id;
private ActionRequest request;
private volatile BulkItemResponse primaryResponse;
private volatile boolean ignoreOnReplica;
BulkItemRequest() {
@ -63,6 +65,25 @@ public class BulkItemRequest implements Streamable {
return indicesRequest.indices()[0];
}
BulkItemResponse getPrimaryResponse() {
return primaryResponse;
}
void setPrimaryResponse(BulkItemResponse primaryResponse) {
this.primaryResponse = primaryResponse;
}
/**
* Marks this request to be ignored and *not* execute on a replica.
*/
void setIgnoreOnReplica() {
this.ignoreOnReplica = true;
}
boolean isIgnoreOnReplica() {
return ignoreOnReplica;
}
public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
BulkItemRequest item = new BulkItemRequest();
item.readFrom(in);
@ -81,6 +102,12 @@ public class BulkItemRequest implements Streamable {
request = new UpdateRequest();
}
request.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
if (in.readBoolean()) {
primaryResponse = BulkItemResponse.readBulkItem(in);
}
ignoreOnReplica = in.readBoolean();
}
}
@Override
@ -94,5 +121,9 @@ public class BulkItemRequest implements Streamable {
out.writeByte((byte) 2);
}
request.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
out.writeOptionalStreamable(primaryResponse);
out.writeBoolean(ignoreOnReplica);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
@ -94,8 +95,14 @@ public class BulkShardRequest extends ShardReplicationOperationRequest<BulkShard
out.writeVInt(items.length);
for (BulkItemRequest item : items) {
if (item != null) {
out.writeBoolean(true);
item.writeTo(out);
// if we are serializing to a node that is pre 1.4, make sure to pass null to maintain
// the old behavior of putting null in the request to be ignored on the replicas
if (item.isIgnoreOnReplica() && out.getVersion().before(Version.V_1_4_0_Beta1)) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
item.writeTo(out);
}
} else {
out.writeBoolean(false);
}

View File

@ -137,7 +137,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Engine.IndexingOperation[] ops = null;
final Set<String> mappingTypesToUpdate = Sets.newHashSet();
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
@ -151,7 +150,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
// add the response
IndexResponse indexResponse = result.response();
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
if (result.mappingTypeToUpdate != null) {
mappingTypesToUpdate.add(result.mappingTypeToUpdate);
}
@ -187,10 +186,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("{} failed to execute bulk item (index) {}", e, shardRequest.shardId, indexRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e));
// nullify the request so it won't execute on the replicas
request.items()[requestIndex] = null;
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(e)) {
setResponse(item, item.getPrimaryResponse());
} else {
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
}
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
@ -200,7 +204,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
try {
// add the response
DeleteResponse deleteResponse = shardDeleteOperation(request, deleteRequest, indexShard).response();
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
} catch (Throwable e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
@ -215,10 +219,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("{} failed to execute bulk item (delete) {}", e, shardRequest.shardId, deleteRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e));
// nullify the request so it won't execute on the replicas
request.items()[requestIndex] = null;
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(e)) {
setResponse(item, item.getPrimaryResponse());
} else {
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
}
}
} else if (item.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) item.request();
@ -247,7 +256,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse);
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
if (result.mappingTypeToUpdate != null) {
mappingTypesToUpdate.add(result.mappingTypeToUpdate);
}
@ -258,20 +268,19 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
ops[requestIndex] = result.op;
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
break;
case DELETE:
DeleteResponse response = updateResult.writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse);
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
break;
case NONE:
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult);
request.items()[requestIndex] = null; // No need to go to the replica
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult));
item.setIgnoreOnReplica(); // no need to go to the replica
break;
}
// NOTE: Breaking out of the retry_on_conflict loop!
@ -281,10 +290,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (updateResult.retry) {
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
// we can't try any more
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t));
request.items()[requestIndex] = null; // do not send to replicas
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
}
} else {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
@ -295,8 +302,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
throw (ElasticsearchException) t;
}
if (updateResult.result == null) {
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(shardRequest.request.index(), updateRequest.type(), updateRequest.id(), t));
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(t)) {
setResponse(item, item.getPrimaryResponse());
} else if (updateResult.result == null) {
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(shardRequest.request.index(), updateRequest.type(), updateRequest.id(), t)));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
@ -307,8 +319,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("{} failed to execute bulk item (index) {}", t, shardRequest.shardId, indexRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t));
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
break;
case DELETE:
DeleteRequest deleteRequest = updateResult.request();
@ -317,13 +329,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("{} failed to execute bulk item (delete) {}", t, shardRequest.shardId, deleteRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t));
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
break;
}
}
// nullify the request so it won't execute on the replicas
request.items()[requestIndex] = null;
// NOTE: Breaking out of the retry_on_conflict loop!
break;
}
@ -332,9 +342,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
assert responses[requestIndex] != null; // we must have set a response somewhere.
assert item.getPrimaryResponse() != null;
assert preVersionTypes[requestIndex] != null;
}
for (String mappingTypToUpdate : mappingTypesToUpdate) {
@ -351,10 +360,22 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
// ignore
}
}
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(shardRequest.shardId, responses);
return new PrimaryResponse<>(shardRequest.request, response, ops);
}
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
request.setPrimaryResponse(response);
if (response.isFailed()) {
request.setIgnoreOnReplica();
}
}
static class WriteResult {
final Object response;
@ -555,7 +576,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (item == null) {
if (item == null || item.isIgnoreOnReplica()) {
continue;
}
if (item.request() instanceof IndexRequest) {
@ -574,7 +595,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
indexShard.create(create);
}
} catch (Throwable e) {
// ignore, we are on backup
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {
throw e;
}
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
@ -582,7 +607,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
} catch (Throwable e) {
// ignore, we are on backup
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {
throw e;
}
}
}
}

View File

@ -48,7 +48,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
private boolean threadedOperation = true;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private boolean canHaveDuplicates = false;
private volatile boolean canHaveDuplicates = false;
protected ShardReplicationOperationRequest() {

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
@ -162,16 +163,28 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
/**
* Should an exception be ignored when the operation is performed on the replica.
*/
boolean ignoreReplicaException(Throwable e) {
protected boolean ignoreReplicaException(Throwable e) {
if (TransportActions.isShardNotAvailableException(e)) {
return true;
}
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
if (isConflictException(e)) {
return true;
}
return false;
}
protected boolean isConflictException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
if (cause instanceof DocumentAlreadyExistsException) {
return true;
}
return false;
}
@ -536,6 +549,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
performReplicas(response);
} catch (Throwable e) {
internalRequest.request.setCanHaveDuplicates();
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
primaryOperationStarted.set(false);