Add replica ops with version conflict to translog
An operation that completed successfully on a primary can result in a version conflict on a replica due to the asynchronous nature of operations. When a replica operation results in a version conflict, the operation is not added to the translog. This leads to gaps in the translog which is problematic as it can lead to situations where a replica shard can never advance its local checkpoint. As such operations are just normal course of business for a replica shard, these operations should be treated as if they completed successfully. This commit adds these operations to the translog. Relates #22626
This commit is contained in:
parent
8e3f1dd689
commit
e6dc74f2bf
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
|
@ -29,6 +30,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.action.update.UpdateHelper;
|
||||
|
@ -65,9 +67,6 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
|
||||
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
|
||||
|
||||
/** Performs shard-level bulk (index, delete or update) operations */
|
||||
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
|
||||
|
||||
|
@ -235,6 +234,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
return location;
|
||||
}
|
||||
|
||||
private static boolean isConflictException(final Exception e) {
|
||||
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
|
||||
}
|
||||
|
||||
private static class UpdateResultHolder {
|
||||
final BulkItemRequest replicaRequest;
|
||||
final Engine.Result operationResult;
|
||||
|
@ -392,7 +395,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
|| failure instanceof IndexShardClosedException
|
||||
: "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" +
|
||||
" failures. got " + failure;
|
||||
if (!ignoreReplicaException(failure)) {
|
||||
if (!TransportActions.isShardNotAvailableException(failure)) {
|
||||
throw failure;
|
||||
}
|
||||
} else {
|
||||
|
@ -401,7 +404,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
if (!ignoreReplicaException(e)) {
|
||||
if (!TransportActions.isShardNotAvailableException(e)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public class ReplicationOperation<
|
|||
shard,
|
||||
replicaRequest),
|
||||
replicaException);
|
||||
if (ignoreReplicaException(replicaException)) {
|
||||
if (TransportActions.isShardNotAvailableException(replicaException)) {
|
||||
decPendingAndFinishIfNeeded();
|
||||
} else {
|
||||
RestStatus restStatus = ExceptionsHelper.status(replicaException);
|
||||
|
@ -314,30 +314,6 @@ public class ReplicationOperation<
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Should an exception be ignored when the operation is performed on the replica.
|
||||
*/
|
||||
public static boolean ignoreReplicaException(Exception e) {
|
||||
if (TransportActions.isShardNotAvailableException(e)) {
|
||||
return true;
|
||||
}
|
||||
// on version conflict or document missing, it means
|
||||
// that a new change has crept into the replica, and it's fine
|
||||
if (isConflictException(e)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static boolean isConflictException(Throwable t) {
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(t);
|
||||
// on version conflict or document missing, it means
|
||||
// that a new change has crept into the replica, and it's fine
|
||||
return cause instanceof VersionConflictEngineException;
|
||||
}
|
||||
|
||||
|
||||
public interface Primary<
|
||||
Request extends ReplicationRequest<Request>,
|
||||
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
|
||||
|
|
|
@ -477,10 +477,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
|
||||
if (op.origin().isRecovery()) {
|
||||
// version conflict, but okay
|
||||
result = onSuccess.get();
|
||||
} else {
|
||||
if (op.origin() == Operation.Origin.PRIMARY) {
|
||||
// fatal version conflict
|
||||
final VersionConflictEngineException e =
|
||||
new VersionConflictEngineException(
|
||||
|
@ -489,8 +486,13 @@ public class InternalEngine extends Engine {
|
|||
op.id(),
|
||||
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
||||
result = onFailure.apply(e);
|
||||
} else {
|
||||
/*
|
||||
* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
|
||||
* successful result.
|
||||
*/
|
||||
result = onSuccess.get();
|
||||
}
|
||||
|
||||
return Optional.of(result);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
|
@ -672,7 +674,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
final long expectedVersion = index.version();
|
||||
final Optional<IndexResult> checkVersionConflictResult =
|
||||
final Optional<IndexResult> resultOnVersionConflict =
|
||||
checkVersionConflict(
|
||||
index,
|
||||
currentVersion,
|
||||
|
@ -682,15 +684,15 @@ public class InternalEngine extends Engine {
|
|||
e -> new IndexResult(e, currentVersion, index.seqNo()));
|
||||
|
||||
final IndexResult indexResult;
|
||||
if (checkVersionConflictResult.isPresent()) {
|
||||
indexResult = checkVersionConflictResult.get();
|
||||
if (resultOnVersionConflict.isPresent()) {
|
||||
indexResult = resultOnVersionConflict.get();
|
||||
} else {
|
||||
// no version conflict
|
||||
if (index.origin() == Operation.Origin.PRIMARY) {
|
||||
seqNo = seqNoService().generateSeqNo();
|
||||
}
|
||||
|
||||
/**
|
||||
/*
|
||||
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
|
||||
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
|
||||
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
|
||||
|
@ -707,10 +709,12 @@ public class InternalEngine extends Engine {
|
|||
update(index.uid(), index.docs(), indexWriter);
|
||||
}
|
||||
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
|
||||
}
|
||||
if (!indexResult.hasFailure()) {
|
||||
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
|
||||
? translog.add(new Translog.Index(index, indexResult))
|
||||
: null;
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
|
||||
indexResult.setTranslogLocation(location);
|
||||
}
|
||||
indexResult.setTook(System.nanoTime() - index.startTime());
|
||||
|
@ -804,7 +808,7 @@ public class InternalEngine extends Engine {
|
|||
|
||||
final long expectedVersion = delete.version();
|
||||
|
||||
final Optional<DeleteResult> result =
|
||||
final Optional<DeleteResult> resultOnVersionConflict =
|
||||
checkVersionConflict(
|
||||
delete,
|
||||
currentVersion,
|
||||
|
@ -812,10 +816,9 @@ public class InternalEngine extends Engine {
|
|||
deleted,
|
||||
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
|
||||
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));
|
||||
|
||||
final DeleteResult deleteResult;
|
||||
if (result.isPresent()) {
|
||||
deleteResult = result.get();
|
||||
if (resultOnVersionConflict.isPresent()) {
|
||||
deleteResult = resultOnVersionConflict.get();
|
||||
} else {
|
||||
if (delete.origin() == Operation.Origin.PRIMARY) {
|
||||
seqNo = seqNoService().generateSeqNo();
|
||||
|
@ -824,11 +827,14 @@ public class InternalEngine extends Engine {
|
|||
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
|
||||
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
|
||||
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
|
||||
|
||||
versionMap.putUnderLock(delete.uid().bytes(),
|
||||
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
|
||||
}
|
||||
if (!deleteResult.hasFailure()) {
|
||||
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
|
||||
? translog.add(new Translog.Delete(delete, deleteResult))
|
||||
: null;
|
||||
versionMap.putUnderLock(delete.uid().bytes(),
|
||||
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
|
||||
deleteResult.setTranslogLocation(location);
|
||||
}
|
||||
deleteResult.setTook(System.nanoTime() - delete.startTime());
|
||||
|
|
|
@ -1478,76 +1478,121 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testVersioningReplicaConflict1() {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
|
||||
final Engine.Index v1Index = new Engine.Index(newUid("1"), doc);
|
||||
final Engine.IndexResult v1Result = engine.index(v1Index);
|
||||
assertThat(v1Result.getVersion(), equalTo(1L));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(2L));
|
||||
final Engine.Index v2Index = new Engine.Index(newUid("1"), doc);
|
||||
final Engine.IndexResult v2Result = engine.index(v2Index);
|
||||
assertThat(v2Result.getVersion(), equalTo(2L));
|
||||
|
||||
// apply the second index to the replica, should work fine
|
||||
index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(2L));
|
||||
final Engine.Index replicaV2Index = new Engine.Index(
|
||||
newUid("1"),
|
||||
doc,
|
||||
v2Result.getSeqNo(),
|
||||
v2Index.primaryTerm(),
|
||||
v2Result.getVersion(),
|
||||
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
|
||||
REPLICA,
|
||||
0,
|
||||
-1,
|
||||
false);
|
||||
final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index);
|
||||
assertThat(replicaV2Result.getVersion(), equalTo(2L));
|
||||
|
||||
long seqNo = indexResult.getSeqNo();
|
||||
long primaryTerm = index.primaryTerm();
|
||||
// now, the old one should not work
|
||||
index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertTrue(indexResult.hasFailure());
|
||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
// now, the old one should produce an indexing result
|
||||
final Engine.Index replicaV1Index = new Engine.Index(
|
||||
newUid("1"),
|
||||
doc,
|
||||
v1Result.getSeqNo(),
|
||||
v1Index.primaryTerm(),
|
||||
v1Result.getVersion(),
|
||||
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
|
||||
REPLICA,
|
||||
0,
|
||||
-1,
|
||||
false);
|
||||
final Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index);
|
||||
assertFalse(replicaV1Result.hasFailure());
|
||||
assertFalse(replicaV1Result.isCreated());
|
||||
assertThat(replicaV1Result.getVersion(), equalTo(2L));
|
||||
|
||||
// second version on replica should fail as well
|
||||
index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 2L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(2L));
|
||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
final Engine.IndexResult replicaV2ReplayResult = replicaEngine.index(replicaV2Index);
|
||||
assertFalse(replicaV2Result.hasFailure());
|
||||
assertFalse(replicaV1Result.isCreated());
|
||||
assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L));
|
||||
}
|
||||
|
||||
public void testVersioningReplicaConflict2() {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
|
||||
final Engine.Index v1Index = new Engine.Index(newUid("1"), doc);
|
||||
final Engine.IndexResult v1Result = engine.index(v1Index);
|
||||
assertThat(v1Result.getVersion(), equalTo(1L));
|
||||
|
||||
// apply the first index to the replica, should work fine
|
||||
index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), 1L,
|
||||
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
final Engine.Index replicaV1Index = new Engine.Index(
|
||||
newUid("1"),
|
||||
doc,
|
||||
v1Result.getSeqNo(),
|
||||
v1Index.primaryTerm(),
|
||||
v1Result.getVersion(),
|
||||
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
|
||||
REPLICA,
|
||||
0,
|
||||
-1,
|
||||
false);
|
||||
Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index);
|
||||
assertThat(replicaV1Result.getVersion(), equalTo(1L));
|
||||
|
||||
// index it again
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(2L));
|
||||
final Engine.Index v2Index = new Engine.Index(newUid("1"), doc);
|
||||
final Engine.IndexResult v2Result = engine.index(v2Index);
|
||||
assertThat(v2Result.getVersion(), equalTo(2L));
|
||||
|
||||
// now delete it
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
|
||||
Engine.DeleteResult deleteResult = engine.delete(delete);
|
||||
final Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
|
||||
final Engine.DeleteResult deleteResult = engine.delete(delete);
|
||||
assertThat(deleteResult.getVersion(), equalTo(3L));
|
||||
|
||||
// apply the delete on the replica (skipping the second index)
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
|
||||
deleteResult = replicaEngine.delete(delete);
|
||||
assertThat(deleteResult.getVersion(), equalTo(3L));
|
||||
final Engine.Delete replicaDelete = new Engine.Delete(
|
||||
"test",
|
||||
"1",
|
||||
newUid("1"),
|
||||
deleteResult.getSeqNo(),
|
||||
delete.primaryTerm(),
|
||||
deleteResult.getVersion(),
|
||||
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
|
||||
REPLICA,
|
||||
0);
|
||||
final Engine.DeleteResult replicaDeleteResult = replicaEngine.delete(replicaDelete);
|
||||
assertThat(replicaDeleteResult.getVersion(), equalTo(3L));
|
||||
|
||||
// second time delete with same version should fail
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
|
||||
deleteResult = replicaEngine.delete(delete);
|
||||
assertTrue(deleteResult.hasFailure());
|
||||
assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
// second time delete with same version should just produce the same version
|
||||
final Engine.DeleteResult deleteReplayResult = replicaEngine.delete(replicaDelete);
|
||||
assertFalse(deleteReplayResult.hasFailure());
|
||||
assertTrue(deleteReplayResult.isFound());
|
||||
assertThat(deleteReplayResult.getVersion(), equalTo(3L));
|
||||
|
||||
// now do the second index on the replica, it should fail
|
||||
index = new Engine.Index(newUid("1"), doc, deleteResult.getSeqNo(), delete.primaryTerm(), 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertTrue(indexResult.hasFailure());
|
||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
// now do the second index on the replica, it should result in the current version
|
||||
final Engine.Index replicaV2Index = new Engine.Index(
|
||||
newUid("1"),
|
||||
doc,
|
||||
v2Result.getSeqNo(),
|
||||
v2Index.primaryTerm(),
|
||||
v2Result.getVersion(),
|
||||
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
|
||||
REPLICA,
|
||||
0,
|
||||
-1,
|
||||
false);
|
||||
final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index);
|
||||
assertFalse(replicaV2Result.hasFailure());
|
||||
assertFalse(replicaV2Result.isCreated());
|
||||
assertThat(replicaV2Result.getVersion(), equalTo(3L));
|
||||
}
|
||||
|
||||
public void testBasicCreatedFlag() {
|
||||
|
|
Loading…
Reference in New Issue