Fix race between replica reset and primary promotion (#32442)
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues linked below), leading to the discovery of a race between resetting a replica when it learns about a higher term and when the same replica is promoted to primary. This commit fixes the race by distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level operation term. The former is set during the cluster state update or when a replica learns about a new primary. The latter is only incremented under the operation block, which can happen in a delayed fashion. It also solves the issue where a replica that's still adjusting to the new term receives a cluster state update that promotes it to primary, which can happen in the situation of multiple nodes being shut down in short succession. In that case, the cluster state update thread would call `asyncBlockOperations` in `updateShardState`, which in turn would throw an exception as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard. This commit therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks (which will all take precedence over operations acquiring permits). Finally, it also moves the primary activation of the replication tracker under the operation block, so that the actual transition to primary only happens under the operation block. Relates to #32431, #32304 and #32118
This commit is contained in:
parent
9dcf3f5aee
commit
0d60e8a029
|
@ -144,7 +144,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
switch (indexResult.getResultType()) {
|
||||
case SUCCESS:
|
||||
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
|
||||
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
|
||||
indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated());
|
||||
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
|
||||
case FAILURE:
|
||||
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
|
||||
|
@ -161,7 +161,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
switch (deleteResult.getResultType()) {
|
||||
case SUCCESS:
|
||||
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
|
||||
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
|
||||
deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
|
||||
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
|
||||
case FAILURE:
|
||||
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
|
||||
|
@ -300,7 +300,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
assert result instanceof Engine.IndexResult : result.getClass();
|
||||
final IndexRequest updateIndexRequest = translate.action();
|
||||
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
|
||||
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
|
||||
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
|
||||
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
|
||||
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
|
||||
indexResponse.getResult());
|
||||
|
@ -320,7 +320,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
final DeleteRequest updateDeleteRequest = translate.action();
|
||||
|
||||
final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
|
||||
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
|
||||
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
|
||||
|
||||
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
|
||||
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
|
||||
|
@ -356,7 +356,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
} catch (Exception failure) {
|
||||
// we may fail translating a update to index or delete operation
|
||||
// we use index result to communicate failure while translating update request
|
||||
final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version());
|
||||
return new BulkItemResultHolder(null, result, primaryItemRequest);
|
||||
}
|
||||
|
||||
|
@ -559,7 +559,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
() ->
|
||||
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
|
||||
request.getAutoGeneratedTimestamp(), request.isRetry()),
|
||||
e -> new Engine.IndexResult(e, request.version()),
|
||||
e -> primary.getFailedIndexResult(e, request.version()),
|
||||
mappingUpdater);
|
||||
}
|
||||
|
||||
|
@ -567,7 +567,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
MappingUpdatePerformer mappingUpdater) throws Exception {
|
||||
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(),
|
||||
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
|
||||
e -> new Engine.DeleteResult(e, request.version()),
|
||||
e -> primary.getFailedDeleteResult(e, request.version()),
|
||||
mappingUpdater);
|
||||
}
|
||||
|
||||
|
|
|
@ -929,7 +929,7 @@ public abstract class TransportReplicationAction<
|
|||
if (actualAllocationId.equals(allocationId) == false) {
|
||||
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
|
||||
}
|
||||
final long actualTerm = indexShard.getPrimaryTerm();
|
||||
final long actualTerm = indexShard.getPendingPrimaryTerm();
|
||||
if (actualTerm != primaryTerm) {
|
||||
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
|
||||
primaryTerm, actualTerm);
|
||||
|
@ -983,7 +983,7 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
|
||||
public boolean isRelocated() {
|
||||
return indexShard.isPrimaryMode() == false;
|
||||
return indexShard.isRelocatedPrimary();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -304,6 +304,7 @@ public abstract class Engine implements Closeable {
|
|||
private final Operation.TYPE operationType;
|
||||
private final Result.Type resultType;
|
||||
private final long version;
|
||||
private final long term;
|
||||
private final long seqNo;
|
||||
private final Exception failure;
|
||||
private final SetOnce<Boolean> freeze = new SetOnce<>();
|
||||
|
@ -311,19 +312,21 @@ public abstract class Engine implements Closeable {
|
|||
private Translog.Location translogLocation;
|
||||
private long took;
|
||||
|
||||
protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) {
|
||||
protected Result(Operation.TYPE operationType, Exception failure, long version, long term, long seqNo) {
|
||||
this.operationType = operationType;
|
||||
this.failure = Objects.requireNonNull(failure);
|
||||
this.version = version;
|
||||
this.term = term;
|
||||
this.seqNo = seqNo;
|
||||
this.requiredMappingUpdate = null;
|
||||
this.resultType = Type.FAILURE;
|
||||
}
|
||||
|
||||
protected Result(Operation.TYPE operationType, long version, long seqNo) {
|
||||
protected Result(Operation.TYPE operationType, long version, long term, long seqNo) {
|
||||
this.operationType = operationType;
|
||||
this.version = version;
|
||||
this.seqNo = seqNo;
|
||||
this.term = term;
|
||||
this.failure = null;
|
||||
this.requiredMappingUpdate = null;
|
||||
this.resultType = Type.SUCCESS;
|
||||
|
@ -333,6 +336,7 @@ public abstract class Engine implements Closeable {
|
|||
this.operationType = operationType;
|
||||
this.version = Versions.NOT_FOUND;
|
||||
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
this.term = 0L;
|
||||
this.failure = null;
|
||||
this.requiredMappingUpdate = requiredMappingUpdate;
|
||||
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
|
||||
|
@ -357,6 +361,10 @@ public abstract class Engine implements Closeable {
|
|||
return seqNo;
|
||||
}
|
||||
|
||||
public long getTerm() {
|
||||
return term;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the operation was aborted due to missing mappings, this method will return the mappings
|
||||
* that are required to complete the operation.
|
||||
|
@ -415,20 +423,20 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
private final boolean created;
|
||||
|
||||
public IndexResult(long version, long seqNo, boolean created) {
|
||||
super(Operation.TYPE.INDEX, version, seqNo);
|
||||
public IndexResult(long version, long term, long seqNo, boolean created) {
|
||||
super(Operation.TYPE.INDEX, version, term, seqNo);
|
||||
this.created = created;
|
||||
}
|
||||
|
||||
/**
|
||||
* use in case of the index operation failed before getting to internal engine
|
||||
**/
|
||||
public IndexResult(Exception failure, long version) {
|
||||
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
public IndexResult(Exception failure, long version, long term) {
|
||||
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
}
|
||||
|
||||
public IndexResult(Exception failure, long version, long seqNo) {
|
||||
super(Operation.TYPE.INDEX, failure, version, seqNo);
|
||||
public IndexResult(Exception failure, long version, long term, long seqNo) {
|
||||
super(Operation.TYPE.INDEX, failure, version, term, seqNo);
|
||||
this.created = false;
|
||||
}
|
||||
|
||||
|
@ -447,20 +455,20 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
private final boolean found;
|
||||
|
||||
public DeleteResult(long version, long seqNo, boolean found) {
|
||||
super(Operation.TYPE.DELETE, version, seqNo);
|
||||
public DeleteResult(long version, long term, long seqNo, boolean found) {
|
||||
super(Operation.TYPE.DELETE, version, term, seqNo);
|
||||
this.found = found;
|
||||
}
|
||||
|
||||
/**
|
||||
* use in case of the delete operation failed before getting to internal engine
|
||||
**/
|
||||
public DeleteResult(Exception failure, long version) {
|
||||
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
|
||||
public DeleteResult(Exception failure, long version, long term) {
|
||||
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
|
||||
}
|
||||
|
||||
public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
|
||||
super(Operation.TYPE.DELETE, failure, version, seqNo);
|
||||
public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) {
|
||||
super(Operation.TYPE.DELETE, failure, version, term, seqNo);
|
||||
this.found = found;
|
||||
}
|
||||
|
||||
|
@ -477,12 +485,12 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
public static class NoOpResult extends Result {
|
||||
|
||||
NoOpResult(long seqNo) {
|
||||
super(Operation.TYPE.NO_OP, 0, seqNo);
|
||||
NoOpResult(long term, long seqNo) {
|
||||
super(Operation.TYPE.NO_OP, term, 0, seqNo);
|
||||
}
|
||||
|
||||
NoOpResult(long seqNo, Exception failure) {
|
||||
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
|
||||
NoOpResult(long term, long seqNo, Exception failure) {
|
||||
super(Operation.TYPE.NO_OP, failure, term, 0, seqNo);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -736,6 +736,10 @@ public class InternalEngine extends Engine {
|
|||
return localCheckpointTracker.generateSeqNo();
|
||||
}
|
||||
|
||||
private long getPrimaryTerm() {
|
||||
return engineConfig.getPrimaryTermSupplier().getAsLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexResult index(Index index) throws IOException {
|
||||
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
|
||||
|
@ -788,7 +792,7 @@ public class InternalEngine extends Engine {
|
|||
indexResult = indexIntoLucene(index, plan);
|
||||
} else {
|
||||
indexResult = new IndexResult(
|
||||
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
}
|
||||
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
final Translog.Location location;
|
||||
|
@ -900,7 +904,7 @@ public class InternalEngine extends Engine {
|
|||
currentVersion, index.version(), currentNotFoundOrDeleted)) {
|
||||
final VersionConflictEngineException e =
|
||||
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
|
||||
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
|
||||
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
|
||||
} else {
|
||||
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
|
||||
generateSeqNoForOperation(index),
|
||||
|
@ -930,7 +934,7 @@ public class InternalEngine extends Engine {
|
|||
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
|
||||
addDocs(index.docs(), indexWriter);
|
||||
}
|
||||
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
} catch (Exception ex) {
|
||||
if (indexWriter.getTragicException() == null) {
|
||||
/* There is no tragic event recorded so this must be a document failure.
|
||||
|
@ -946,7 +950,7 @@ public class InternalEngine extends Engine {
|
|||
* we return a `MATCH_ANY` version to indicate no document was index. The value is
|
||||
* not used anyway
|
||||
*/
|
||||
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
|
||||
return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -1019,8 +1023,8 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
static IndexingStrategy skipDueToVersionConflict(
|
||||
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
|
||||
final IndexResult result = new IndexResult(e, currentVersion);
|
||||
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
|
||||
final IndexResult result = new IndexResult(e, currentVersion, term);
|
||||
return new IndexingStrategy(
|
||||
currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
|
||||
}
|
||||
|
@ -1097,7 +1101,7 @@ public class InternalEngine extends Engine {
|
|||
deleteResult = deleteInLucene(delete, plan);
|
||||
} else {
|
||||
deleteResult = new DeleteResult(
|
||||
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
}
|
||||
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
final Translog.Location location;
|
||||
|
@ -1178,7 +1182,7 @@ public class InternalEngine extends Engine {
|
|||
final DeletionStrategy plan;
|
||||
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
|
||||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
|
||||
} else {
|
||||
plan = DeletionStrategy.processNormally(
|
||||
currentlyDeleted,
|
||||
|
@ -1201,12 +1205,12 @@ public class InternalEngine extends Engine {
|
|||
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
|
||||
engineConfig.getThreadPool().relativeTimeInMillis()));
|
||||
return new DeleteResult(
|
||||
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
} catch (Exception ex) {
|
||||
if (indexWriter.getTragicException() == null) {
|
||||
// there is no tragic event and such it must be a document level failure
|
||||
return new DeleteResult(
|
||||
ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -1237,9 +1241,9 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
static DeletionStrategy skipDueToVersionConflict(
|
||||
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
|
||||
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
|
||||
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
|
||||
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
|
||||
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
|
||||
}
|
||||
|
||||
|
@ -1268,7 +1272,7 @@ public class InternalEngine extends Engine {
|
|||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
noOpResult = innerNoOp(noOp);
|
||||
} catch (final Exception e) {
|
||||
noOpResult = new NoOpResult(noOp.seqNo(), e);
|
||||
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
|
||||
}
|
||||
return noOpResult;
|
||||
}
|
||||
|
@ -1278,7 +1282,7 @@ public class InternalEngine extends Engine {
|
|||
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
|
||||
final long seqNo = noOp.seqNo();
|
||||
try {
|
||||
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
|
||||
final NoOpResult noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
|
||||
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
|
||||
noOpResult.setTranslogLocation(location);
|
||||
|
|
|
@ -85,6 +85,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
* computation from that point on.
|
||||
*/
|
||||
volatile boolean primaryMode;
|
||||
|
||||
/**
|
||||
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
|
||||
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
|
||||
|
@ -102,6 +103,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
*/
|
||||
boolean handoffInProgress;
|
||||
|
||||
/**
|
||||
* Boolean flag that indicates whether a relocation handoff completed (see {@link #completeRelocationHandoff}).
|
||||
*/
|
||||
volatile boolean relocated;
|
||||
|
||||
/**
|
||||
* The global checkpoint tracker relies on the property that cluster state updates are applied in-order. After transferring a primary
|
||||
* context from the primary relocation source to the target and initializing the target, it is possible for the target to apply a
|
||||
|
@ -260,6 +266,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
return primaryMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the replication tracker has relocated away to another shard copy.
|
||||
*/
|
||||
public boolean isRelocated() {
|
||||
return relocated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication
|
||||
* as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however.
|
||||
|
@ -287,6 +300,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
// relocation handoff can only occur in primary mode
|
||||
assert !handoffInProgress || primaryMode;
|
||||
|
||||
// a relocated copy is not in primary mode
|
||||
assert !relocated || !primaryMode;
|
||||
|
||||
// the current shard is marked as in-sync when the global checkpoint tracker operates in primary mode
|
||||
assert !primaryMode || checkpoints.get(shardAllocationId).inSync;
|
||||
|
||||
|
@ -766,8 +782,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
assert invariant();
|
||||
assert primaryMode;
|
||||
assert handoffInProgress;
|
||||
assert relocated == false;
|
||||
primaryMode = false;
|
||||
handoffInProgress = false;
|
||||
relocated = true;
|
||||
// forget all checkpoint information except for global checkpoint of current shard
|
||||
checkpoints.entrySet().stream().forEach(e -> {
|
||||
final CheckpointState cps = e.getValue();
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
|
|||
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -192,7 +193,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
protected volatile ShardRouting shardRouting;
|
||||
protected volatile IndexShardState state;
|
||||
protected volatile long primaryTerm;
|
||||
protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
|
||||
protected volatile long operationPrimaryTerm;
|
||||
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
|
||||
final EngineFactory engineFactory;
|
||||
|
||||
|
@ -315,7 +317,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
|
||||
searcherWrapper = indexSearcherWrapper;
|
||||
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
|
||||
pendingPrimaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
|
||||
operationPrimaryTerm = pendingPrimaryTerm;
|
||||
refreshListeners = buildRefreshListeners();
|
||||
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
|
||||
persistMetadata(path, indexSettings, shardRouting, null, logger);
|
||||
|
@ -365,10 +368,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
|
||||
* USE THIS METHOD WITH CARE!
|
||||
* Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about
|
||||
* a new term due to a new primary, the term that's exposed here will not be the term that the shard internally uses to assign
|
||||
* to operations. The shard will auto-correct its internal operation term, but this might take time.
|
||||
* See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
|
||||
*/
|
||||
public long getPrimaryTerm() {
|
||||
return this.primaryTerm;
|
||||
public long getPendingPrimaryTerm() {
|
||||
return this.pendingPrimaryTerm;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -418,7 +425,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
"a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
|
||||
|
||||
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
|
||||
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false &&
|
||||
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isRelocated() &&
|
||||
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
|
||||
// if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard routing occur (e.g. due to recovery
|
||||
// failure / cancellation). The reason is that at the moment we cannot safely reactivate primary mode without risking two
|
||||
|
@ -431,7 +438,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final CountDownLatch shardStateUpdated = new CountDownLatch(1);
|
||||
|
||||
if (newRouting.primary()) {
|
||||
if (newPrimaryTerm == primaryTerm) {
|
||||
if (newPrimaryTerm == pendingPrimaryTerm) {
|
||||
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
|
||||
// the master started a recovering primary, activate primary mode.
|
||||
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
|
||||
|
@ -454,10 +461,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
assert newRouting.initializing() == false :
|
||||
"a started primary shard should never update its term; "
|
||||
+ "shard " + newRouting + ", "
|
||||
+ "current term [" + primaryTerm + "], "
|
||||
+ "current term [" + pendingPrimaryTerm + "], "
|
||||
+ "new term [" + newPrimaryTerm + "]";
|
||||
assert newPrimaryTerm > primaryTerm :
|
||||
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
|
||||
assert newPrimaryTerm > pendingPrimaryTerm :
|
||||
"primary terms can only go up; current term [" + pendingPrimaryTerm + "], new term [" + newPrimaryTerm + "]";
|
||||
/*
|
||||
* Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
|
||||
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
|
||||
|
@ -468,12 +475,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
if (resyncStarted == false) {
|
||||
throw new IllegalStateException("cannot start resync while it's already in progress");
|
||||
}
|
||||
indexShardOperationPermits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
bumpPrimaryTerm(newPrimaryTerm,
|
||||
() -> {
|
||||
shardStateUpdated.await();
|
||||
assert pendingPrimaryTerm == newPrimaryTerm :
|
||||
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" +
|
||||
", current routing: " + currentRouting + ", new routing: " + newRouting;
|
||||
assert operationPrimaryTerm == newPrimaryTerm;
|
||||
try {
|
||||
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
|
||||
/*
|
||||
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
|
||||
* its local checkpoint tracker was reset during the primary term transition. In particular, the local
|
||||
|
@ -517,10 +527,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
} catch (final AlreadyClosedException e) {
|
||||
// okay, the index was deleted
|
||||
}
|
||||
},
|
||||
e -> failShard("exception during primary term transition", e));
|
||||
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
|
||||
primaryTerm = newPrimaryTerm;
|
||||
});
|
||||
}
|
||||
}
|
||||
// set this last, once we finished updating all internal state.
|
||||
|
@ -528,8 +535,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
assert this.shardRouting.primary() == false ||
|
||||
this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards
|
||||
this.indexShardOperationPermits.isBlocked() || // if permits are blocked, we are still transitioning
|
||||
this.replicationTracker.isPrimaryMode()
|
||||
: "an started primary must be in primary mode " + this.shardRouting;
|
||||
: "a started primary with non-pending operation term must be in primary mode " + this.shardRouting;
|
||||
shardStateUpdated.countDown();
|
||||
}
|
||||
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
|
||||
|
@ -590,7 +598,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
consumer.accept(primaryContext);
|
||||
synchronized (mutex) {
|
||||
verifyRelocatingState();
|
||||
replicationTracker.completeRelocationHandoff(); // make changes to primaryMode flag only under mutex
|
||||
replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under mutex
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
try {
|
||||
|
@ -655,21 +663,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
|
||||
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
|
||||
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp,
|
||||
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
|
||||
}
|
||||
|
||||
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
|
||||
boolean isRetry, SourceToParse sourceToParse)
|
||||
throws IOException {
|
||||
return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
|
||||
return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
|
||||
Engine.Operation.Origin.REPLICA, sourceToParse);
|
||||
}
|
||||
|
||||
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
|
||||
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
|
||||
SourceToParse sourceToParse) throws IOException {
|
||||
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
|
||||
assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
|
||||
+ "]";
|
||||
ensureWriteAllowed(origin);
|
||||
Engine.Index operation;
|
||||
try {
|
||||
|
@ -686,7 +695,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
// can not raise an exception that may block any replication of previous operations to the
|
||||
// replicas
|
||||
verifyNotClosed(e);
|
||||
return new Engine.IndexResult(e, version, seqNo);
|
||||
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
|
||||
}
|
||||
|
||||
return index(getEngine(), operation);
|
||||
|
@ -723,12 +732,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException {
|
||||
return markSeqNoAsNoop(seqNo, primaryTerm, reason, Engine.Operation.Origin.REPLICA);
|
||||
return markSeqNoAsNoop(seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
|
||||
private Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason,
|
||||
Engine.Operation.Origin origin) throws IOException {
|
||||
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
|
||||
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
|
||||
+ "]";
|
||||
long startTime = System.nanoTime();
|
||||
ensureWriteAllowed(origin);
|
||||
final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason);
|
||||
|
@ -743,20 +753,29 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return engine.noOp(noOp);
|
||||
}
|
||||
|
||||
public Engine.IndexResult getFailedIndexResult(Exception e, long version) {
|
||||
return new Engine.IndexResult(e, version, operationPrimaryTerm);
|
||||
}
|
||||
|
||||
public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
|
||||
return new Engine.DeleteResult(e, version, operationPrimaryTerm);
|
||||
}
|
||||
|
||||
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
|
||||
throws IOException {
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType,
|
||||
return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
|
||||
Engine.Operation.Origin.PRIMARY);
|
||||
}
|
||||
|
||||
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
|
||||
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
|
||||
return applyDeleteOperation(seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
|
||||
private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
|
||||
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
|
||||
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
|
||||
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
|
||||
+ "]";
|
||||
ensureWriteAllowed(origin);
|
||||
// When there is a single type, the unique identifier is only composed of the _id,
|
||||
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
|
||||
|
@ -772,7 +791,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return new Engine.DeleteResult(update);
|
||||
}
|
||||
} catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) {
|
||||
return new Engine.DeleteResult(e, version, seqNo, false);
|
||||
return new Engine.DeleteResult(e, version, operationPrimaryTerm, seqNo, false);
|
||||
}
|
||||
final Term uid = extractUidForDelete(type, id);
|
||||
final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version,
|
||||
|
@ -1209,7 +1228,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
|
||||
getEngine().trimOperationsFromTranslog(primaryTerm, aboveSeqNo);
|
||||
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
|
||||
}
|
||||
|
||||
public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
|
||||
|
@ -2082,10 +2101,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns whether the shard is in primary mode, i.e., in charge of replicating changes (see {@link ReplicationTracker}).
|
||||
* Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
|
||||
*/
|
||||
public boolean isPrimaryMode() {
|
||||
return replicationTracker.isPrimaryMode();
|
||||
public boolean isRelocatedPrimary() {
|
||||
assert shardRouting.primary() : "only call isRelocatedPrimary on primary shard";
|
||||
return replicationTracker.isRelocated();
|
||||
}
|
||||
|
||||
class ShardEventListener implements Engine.EventListener {
|
||||
|
@ -2175,7 +2195,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
|
||||
Collections.singletonList(refreshListeners),
|
||||
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
|
||||
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm);
|
||||
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2194,7 +2214,25 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
|
||||
}
|
||||
|
||||
private final Object primaryTermMutex = new Object();
|
||||
private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
|
||||
assert Thread.holdsLock(mutex);
|
||||
assert newPrimaryTerm > pendingPrimaryTerm;
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
final CountDownLatch termUpdated = new CountDownLatch(1);
|
||||
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
termUpdated.await();
|
||||
// indexShardOperationPermits doesn't guarantee that async submissions are executed
|
||||
// in the order submitted. We need to guard against another term bump
|
||||
if (operationPrimaryTerm < newPrimaryTerm) {
|
||||
operationPrimaryTerm = newPrimaryTerm;
|
||||
onBlocked.run();
|
||||
}
|
||||
},
|
||||
e -> failShard("exception during primary term transition", e));
|
||||
pendingPrimaryTerm = newPrimaryTerm;
|
||||
termUpdated.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire a replica operation permit whenever the shard is ready for indexing (see
|
||||
|
@ -2203,7 +2241,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
|
||||
* name.
|
||||
*
|
||||
* @param operationPrimaryTerm the operation primary term
|
||||
* @param opPrimaryTerm the operation primary term
|
||||
* @param globalCheckpoint the global checkpoint associated with the request
|
||||
* @param onPermitAcquired the listener for permit acquisition
|
||||
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
|
||||
|
@ -2211,15 +2249,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
|
||||
* isn't used
|
||||
*/
|
||||
public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
|
||||
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint,
|
||||
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
|
||||
final Object debugInfo) {
|
||||
verifyNotClosed();
|
||||
verifyReplicationTarget();
|
||||
final boolean globalCheckpointUpdated;
|
||||
if (operationPrimaryTerm > primaryTerm) {
|
||||
synchronized (primaryTermMutex) {
|
||||
if (operationPrimaryTerm > primaryTerm) {
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
synchronized (mutex) {
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
IndexShardState shardState = state();
|
||||
// only roll translog and update primary term if shard has made it past recovery
|
||||
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
|
||||
|
@ -2229,11 +2266,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
shardState != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(shardId, shardState);
|
||||
}
|
||||
try {
|
||||
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
assert operationPrimaryTerm > primaryTerm :
|
||||
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
|
||||
primaryTerm = operationPrimaryTerm;
|
||||
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
bumpPrimaryTerm(opPrimaryTerm, () -> {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
|
||||
final long currentGlobalCheckpoint = getGlobalCheckpoint();
|
||||
final long localCheckpoint;
|
||||
|
@ -2244,42 +2279,33 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
logger.trace(
|
||||
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
|
||||
operationPrimaryTerm,
|
||||
opPrimaryTerm,
|
||||
getLocalCheckpoint(),
|
||||
localCheckpoint);
|
||||
getEngine().resetLocalCheckpoint(localCheckpoint);
|
||||
getEngine().rollTranslogGeneration();
|
||||
});
|
||||
globalCheckpointUpdated = true;
|
||||
} catch (final Exception e) {
|
||||
onPermitAcquired.onFailure(e);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
globalCheckpointUpdated = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
globalCheckpointUpdated = false;
|
||||
}
|
||||
}
|
||||
|
||||
assert operationPrimaryTerm <= primaryTerm
|
||||
: "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]";
|
||||
assert opPrimaryTerm <= pendingPrimaryTerm
|
||||
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
|
||||
indexShardOperationPermits.acquire(
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
if (operationPrimaryTerm < primaryTerm) {
|
||||
if (opPrimaryTerm < operationPrimaryTerm) {
|
||||
releasable.close();
|
||||
final String message = String.format(
|
||||
Locale.ROOT,
|
||||
"%s operation primary term [%d] is too old (current [%d])",
|
||||
shardId,
|
||||
operationPrimaryTerm,
|
||||
primaryTerm);
|
||||
opPrimaryTerm,
|
||||
operationPrimaryTerm);
|
||||
onPermitAcquired.onFailure(new IllegalStateException(message));
|
||||
} else {
|
||||
if (globalCheckpointUpdated == false) {
|
||||
try {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
|
||||
} catch (Exception e) {
|
||||
|
@ -2287,7 +2313,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
onPermitAcquired.onFailure(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
onPermitAcquired.onResponse(releasable);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -29,10 +28,12 @@ import org.elasticsearch.common.collect.Tuple;
|
|||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -59,7 +60,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
|
||||
private final List<DelayedOperation> delayedOperations = new ArrayList<>(); // operations that are delayed
|
||||
private volatile boolean closed;
|
||||
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this
|
||||
private int queuedBlockOperations; // does not need to be volatile as all accesses are done under a lock on this
|
||||
|
||||
// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics.
|
||||
// Value is a tuple, with a some debug information supplied by the caller and a stack trace of the acquiring thread
|
||||
|
@ -102,9 +103,6 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
final long timeout,
|
||||
final TimeUnit timeUnit,
|
||||
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
|
||||
if (closed) {
|
||||
throw new IndexShardClosedException(shardId);
|
||||
}
|
||||
delayOperations();
|
||||
try {
|
||||
doBlockOperations(timeout, timeUnit, onBlocked);
|
||||
|
@ -147,13 +145,12 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
|
||||
private void delayOperations() {
|
||||
synchronized (this) {
|
||||
if (delayed) {
|
||||
throw new IllegalStateException("operations are already delayed");
|
||||
} else {
|
||||
assert delayedOperations.isEmpty();
|
||||
delayed = true;
|
||||
if (closed) {
|
||||
throw new IndexShardClosedException(shardId);
|
||||
}
|
||||
synchronized (this) {
|
||||
assert queuedBlockOperations > 0 || delayedOperations.isEmpty();
|
||||
queuedBlockOperations++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,7 +161,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
if (Assertions.ENABLED) {
|
||||
// since delayed is not volatile, we have to synchronize even here for visibility
|
||||
synchronized (this) {
|
||||
assert delayed;
|
||||
assert queuedBlockOperations > 0;
|
||||
}
|
||||
}
|
||||
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
|
||||
|
@ -182,10 +179,14 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
private void releaseDelayedOperations() {
|
||||
final List<DelayedOperation> queuedActions;
|
||||
synchronized (this) {
|
||||
assert delayed;
|
||||
assert queuedBlockOperations > 0;
|
||||
queuedBlockOperations--;
|
||||
if (queuedBlockOperations == 0) {
|
||||
queuedActions = new ArrayList<>(delayedOperations);
|
||||
delayedOperations.clear();
|
||||
delayed = false;
|
||||
} else {
|
||||
queuedActions = Collections.emptyList();
|
||||
}
|
||||
}
|
||||
if (!queuedActions.isEmpty()) {
|
||||
/*
|
||||
|
@ -242,7 +243,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
final Releasable releasable;
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (delayed) {
|
||||
if (queuedBlockOperations > 0) {
|
||||
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
|
||||
final ActionListener<Releasable> wrappedListener;
|
||||
if (executorOnDelay != null) {
|
||||
|
@ -308,6 +309,11 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
synchronized boolean isBlocked() {
|
||||
return queuedBlockOperations > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a list of describing each permit that wasn't released yet. The description consist of the debugInfo supplied
|
||||
* when the permit was acquired plus a stack traces that was captured when the permit was request.
|
||||
|
|
|
@ -136,7 +136,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
|
|||
}
|
||||
};
|
||||
|
||||
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
|
||||
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot,
|
||||
startingSeqNo, maxSeqNo, resyncListener);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
|
|
|
@ -394,7 +394,7 @@ final class StoreRecovery {
|
|||
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
|
||||
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
|
||||
final String translogUUID = Translog.createEmptyTranslog(
|
||||
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
|
||||
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
} else if (indexShouldExists) {
|
||||
// since we recover from local, just fill the files and size
|
||||
|
@ -409,11 +409,12 @@ final class StoreRecovery {
|
|||
} else {
|
||||
store.createEmpty();
|
||||
final String translogUUID = Translog.createEmptyTranslog(
|
||||
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPrimaryTerm());
|
||||
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
|
||||
indexShard.getPendingPrimaryTerm());
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
}
|
||||
indexShard.openEngineAndRecoverFromTranslog();
|
||||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
|
||||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
|
||||
indexShard.finalizeRecovery();
|
||||
indexShard.postRecovery("post recovery from shard_store");
|
||||
} catch (EngineException | IOException e) {
|
||||
|
@ -458,11 +459,11 @@ final class StoreRecovery {
|
|||
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
|
||||
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
|
||||
final String translogUUID = Translog.createEmptyTranslog(
|
||||
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
|
||||
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
|
||||
indexShard.openEngineAndRecoverFromTranslog();
|
||||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
|
||||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
|
||||
indexShard.finalizeRecovery();
|
||||
indexShard.postRecovery("restore done");
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -491,7 +491,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
if (operation.primaryTerm() > current.getPrimaryTerm()) {
|
||||
throw new IllegalArgumentException("Operation term is newer than the current term;"
|
||||
assert false :
|
||||
"Operation term is newer than the current term; "
|
||||
+ "current term[" + current.getPrimaryTerm() + "], operation term[" + operation + "]";
|
||||
throw new IllegalArgumentException("Operation term is newer than the current term; "
|
||||
+ "current term[" + current.getPrimaryTerm() + "], operation term[" + operation + "]");
|
||||
}
|
||||
return current.add(bytes, operation.seqNo());
|
||||
|
|
|
@ -250,7 +250,7 @@ public class RecoverySourceHandler {
|
|||
try (Releasable ignored = FutureUtils.get(permit)) {
|
||||
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
|
||||
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
|
||||
if (primary.isPrimaryMode() == false) {
|
||||
if (primary.isRelocatedPrimary()) {
|
||||
throw new IndexShardRelocatedException(primary.shardId());
|
||||
}
|
||||
runnable.run();
|
||||
|
|
|
@ -443,7 +443,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
|||
}
|
||||
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
|
||||
final String translogUUID = Translog.createEmptyTranslog(
|
||||
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPrimaryTerm());
|
||||
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
|
||||
indexShard.getPendingPrimaryTerm());
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
|
||||
// this is a fatal exception at this stage.
|
||||
|
|
|
@ -441,7 +441,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
||||
|
||||
Exception err = new ElasticsearchException("I'm dead <(x.x)>");
|
||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0);
|
||||
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult,
|
||||
replicaRequest);
|
||||
|
||||
|
@ -478,7 +478,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
|
||||
Exception err = new VersionConflictEngineException(shardId, "_doc", "id",
|
||||
"I'm conflicted <(;_;)>");
|
||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0);
|
||||
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult,
|
||||
replicaRequest);
|
||||
|
||||
|
@ -516,7 +516,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
|
||||
boolean created = randomBoolean();
|
||||
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
|
||||
Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation);
|
||||
Engine.IndexResult indexResult = new FakeResult(1, 1, 1, created, resultLocation);
|
||||
DocWriteResponse indexResponse = new IndexResponse(shardId, "_doc", "id", 1, 17, 1, created);
|
||||
BulkItemResultHolder goodResults =
|
||||
new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
|
||||
|
@ -559,7 +559,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
Translog.Location newLocation = new Translog.Location(1, 1, 1);
|
||||
final long version = randomNonNegativeLong();
|
||||
final long seqNo = randomNonNegativeLong();
|
||||
Engine.IndexResult indexResult = new IndexResultWithLocation(version, seqNo, created, newLocation);
|
||||
Engine.IndexResult indexResult = new IndexResultWithLocation(version, 0L, seqNo, created, newLocation);
|
||||
results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
|
||||
assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results),
|
||||
equalTo(newLocation));
|
||||
|
@ -629,8 +629,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
|
||||
public class IndexResultWithLocation extends Engine.IndexResult {
|
||||
private final Translog.Location location;
|
||||
public IndexResultWithLocation(long version, long seqNo, boolean created, Translog.Location newLocation) {
|
||||
super(version, seqNo, created);
|
||||
public IndexResultWithLocation(long version, long term, long seqNo, boolean created, Translog.Location newLocation) {
|
||||
super(version, term, seqNo, created);
|
||||
this.location = newLocation;
|
||||
}
|
||||
|
||||
|
@ -647,8 +647,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
BulkItemRequest request = new BulkItemRequest(0, updateRequest);
|
||||
Exception err = new VersionConflictEngineException(shardId, "_doc", "id",
|
||||
"I'm conflicted <(;_;)>");
|
||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
||||
Engine.DeleteResult deleteResult = new Engine.DeleteResult(1, 1, true);
|
||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0);
|
||||
Engine.DeleteResult deleteResult = new Engine.DeleteResult(1, 1, 1, true);
|
||||
DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED;
|
||||
DocWriteResponse.Result deleteWriteResult = DocWriteResponse.Result.DELETED;
|
||||
IndexRequest indexRequest = new IndexRequest("index", "_doc", "id");
|
||||
|
@ -830,8 +830,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
|
||||
private final Translog.Location location;
|
||||
|
||||
protected FakeResult(long version, long seqNo, boolean created, Translog.Location location) {
|
||||
super(version, seqNo, created);
|
||||
protected FakeResult(long version, long term, long seqNo, boolean created, Translog.Location location) {
|
||||
super(version, term, seqNo, created);
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
|
|
|
@ -587,8 +587,6 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
public void testPrimaryReference() throws Exception {
|
||||
final IndexShard shard = mock(IndexShard.class);
|
||||
final long primaryTerm = 1 + randomInt(200);
|
||||
when(shard.getPrimaryTerm()).thenReturn(primaryTerm);
|
||||
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
Releasable releasable = () -> {
|
||||
|
@ -683,9 +681,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
|
||||
final IndexShard shard = mock(IndexShard.class);
|
||||
when(shard.getPrimaryTerm()).thenReturn(primaryTerm);
|
||||
when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
|
||||
when(shard.routingEntry()).thenReturn(routingEntry);
|
||||
when(shard.isPrimaryMode()).thenReturn(true);
|
||||
when(shard.isRelocatedPrimary()).thenReturn(false);
|
||||
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
|
||||
Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) :
|
||||
clusterService.state().metaData().index(index).inSyncAllocationIds(0);
|
||||
|
@ -1201,7 +1199,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
doAnswer(invocation -> {
|
||||
long term = (Long)invocation.getArguments()[0];
|
||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
|
||||
final long primaryTerm = indexShard.getPrimaryTerm();
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
if (term < primaryTerm) {
|
||||
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
|
||||
shardId, term, primaryTerm));
|
||||
|
@ -1219,9 +1217,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
return routing;
|
||||
});
|
||||
when(indexShard.isPrimaryMode()).thenAnswer(invocationOnMock -> isRelocated.get() == false);
|
||||
when(indexShard.isRelocatedPrimary()).thenAnswer(invocationOnMock -> isRelocated.get());
|
||||
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
|
||||
when(indexShard.getPrimaryTerm()).thenAnswer(i ->
|
||||
when(indexShard.getPendingPrimaryTerm()).thenAnswer(i ->
|
||||
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
|
||||
return indexShard;
|
||||
}
|
||||
|
|
|
@ -454,7 +454,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
doAnswer(invocation -> {
|
||||
long term = (Long)invocation.getArguments()[0];
|
||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
|
||||
final long primaryTerm = indexShard.getPrimaryTerm();
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
if (term < primaryTerm) {
|
||||
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
|
||||
shardId, term, primaryTerm));
|
||||
|
@ -472,9 +472,9 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
}
|
||||
return routing;
|
||||
});
|
||||
when(indexShard.isPrimaryMode()).thenAnswer(invocationOnMock -> isRelocated.get() == false);
|
||||
when(indexShard.isRelocatedPrimary()).thenAnswer(invocationOnMock -> isRelocated.get());
|
||||
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
|
||||
when(indexShard.getPrimaryTerm()).thenAnswer(i ->
|
||||
when(indexShard.getPendingPrimaryTerm()).thenAnswer(i ->
|
||||
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
|
||||
return indexShard;
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ public class ShardStateIT extends ESIntegTestCase {
|
|||
if (indexService != null) {
|
||||
for (IndexShard shard : indexService) {
|
||||
assertThat("term mismatch for shard " + shard.shardId(),
|
||||
shard.getPrimaryTerm(), equalTo(metaData.primaryTerm(shard.shardId().id())));
|
||||
shard.getPendingPrimaryTerm(), equalTo(metaData.primaryTerm(shard.shardId().id())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,8 +59,10 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
|
@ -221,7 +223,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
}
|
||||
|
||||
logger.info("--> promoting replica to primary " + replica1.routingEntry());
|
||||
shards.promoteReplicaToPrimary(replica1);
|
||||
shards.promoteReplicaToPrimary(replica1).get();
|
||||
indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"2\"}", XContentType.JSON);
|
||||
shards.index(indexRequest);
|
||||
shards.refresh("test");
|
||||
|
@ -234,6 +236,102 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
}
|
||||
}
|
||||
|
||||
public void testReplicaTermIncrementWithConcurrentPrimaryPromotion() throws Exception {
|
||||
Map<String, String> mappings =
|
||||
Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
|
||||
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) {
|
||||
shards.startAll();
|
||||
long primaryPrimaryTerm = shards.getPrimary().getPendingPrimaryTerm();
|
||||
List<IndexShard> replicas = shards.getReplicas();
|
||||
IndexShard replica1 = replicas.get(0);
|
||||
IndexShard replica2 = replicas.get(1);
|
||||
|
||||
shards.promoteReplicaToPrimary(replica1, (shard, listener) -> {});
|
||||
long newReplica1Term = replica1.getPendingPrimaryTerm();
|
||||
assertEquals(primaryPrimaryTerm + 1, newReplica1Term);
|
||||
|
||||
assertEquals(primaryPrimaryTerm, replica2.getPendingPrimaryTerm());
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
|
||||
BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, replica1);
|
||||
|
||||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
Thread t1 = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
indexOnReplica(replicationRequest, shards, replica2, newReplica1Term);
|
||||
} catch (IllegalStateException ise) {
|
||||
assertThat(ise.getMessage(), containsString("is too old"));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Thread t2 = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
shards.promoteReplicaToPrimary(replica2).get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
t2.start();
|
||||
t1.start();
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
assertEquals(newReplica1Term + 1, replica2.getPendingPrimaryTerm());
|
||||
}
|
||||
}
|
||||
|
||||
public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exception {
|
||||
Map<String, String> mappings =
|
||||
Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
|
||||
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(1, mappings))) {
|
||||
shards.startAll();
|
||||
long primaryPrimaryTerm = shards.getPrimary().getPendingPrimaryTerm();
|
||||
IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
|
||||
BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary());
|
||||
|
||||
List<IndexShard> replicas = shards.getReplicas();
|
||||
IndexShard replica = replicas.get(0);
|
||||
|
||||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
AtomicBoolean successFullyIndexed = new AtomicBoolean();
|
||||
Thread t1 = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
indexOnReplica(replicationRequest, shards, replica, primaryPrimaryTerm);
|
||||
successFullyIndexed.set(true);
|
||||
} catch (IllegalStateException ise) {
|
||||
assertThat(ise.getMessage(), containsString("is too old"));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Thread t2 = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
shards.promoteReplicaToPrimary(replica).get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
t2.start();
|
||||
t1.start();
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
assertEquals(primaryPrimaryTerm + 1, replica.getPendingPrimaryTerm());
|
||||
if (successFullyIndexed.get()) {
|
||||
try(Translog.Snapshot snapshot = getTranslog(replica).newSnapshot()) {
|
||||
assertThat(snapshot.totalOperations(), equalTo(1));
|
||||
Translog.Operation op = snapshot.next();
|
||||
assertThat(op.primaryTerm(), equalTo(primaryPrimaryTerm));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* test document failures (failures after seq_no generation) are added as noop operation to the translog
|
||||
* for primary and replica shards
|
||||
|
@ -255,7 +353,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
.source("{}", XContentType.JSON)
|
||||
);
|
||||
assertTrue(response.isFailed());
|
||||
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPrimaryTerm(), failureMessage);
|
||||
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
|
||||
shards.assertAllEqual(0);
|
||||
|
||||
// add some replicas
|
||||
|
@ -269,7 +367,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
.source("{}", XContentType.JSON)
|
||||
);
|
||||
assertTrue(response.isFailed());
|
||||
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPrimaryTerm(), failureMessage);
|
||||
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
|
||||
shards.assertAllEqual(0);
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +459,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
// Make sure that peer-recovery transfers all but non-overridden operations.
|
||||
IndexShard replica3 = shards.addReplica();
|
||||
logger.info("--> Promote replica2 as the primary");
|
||||
shards.promoteReplicaToPrimary(replica2);
|
||||
shards.promoteReplicaToPrimary(replica2).get();
|
||||
logger.info("--> Recover replica3 from replica2");
|
||||
recoverReplica(replica3, replica2, true);
|
||||
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
|
||||
|
|
|
@ -245,7 +245,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
}
|
||||
|
||||
shards.promoteReplicaToPrimary(newPrimary);
|
||||
shards.promoteReplicaToPrimary(newPrimary).get();
|
||||
|
||||
// check that local checkpoint of new primary is properly tracked after primary promotion
|
||||
assertThat(newPrimary.getLocalCheckpoint(), equalTo(totalDocs - 1L));
|
||||
|
@ -432,7 +432,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
while ((next = snapshot.next()) != null) {
|
||||
translogOperations++;
|
||||
assertThat("unexpected op: " + next, (int)next.seqNo(), lessThan(initialDocs + extraDocs));
|
||||
assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(), is(oldPrimary.getPrimaryTerm()));
|
||||
assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(),
|
||||
is(oldPrimary.getPendingPrimaryTerm()));
|
||||
final Translog.Source source = next.getSource();
|
||||
assertThat(source.source.utf8ToString(), is("{ \"f\": \"normal\"}"));
|
||||
}
|
||||
|
|
|
@ -770,8 +770,10 @@ public class ReplicationTrackerTests extends ESTestCase {
|
|||
assertThat(newPrimary.routingTable, equalTo(oldPrimary.routingTable));
|
||||
assertThat(newPrimary.replicationGroup, equalTo(oldPrimary.replicationGroup));
|
||||
|
||||
assertFalse(oldPrimary.relocated);
|
||||
oldPrimary.completeRelocationHandoff();
|
||||
assertFalse(oldPrimary.primaryMode);
|
||||
assertTrue(oldPrimary.relocated);
|
||||
}
|
||||
|
||||
public void testIllegalStateExceptionIfUnknownAllocationId() {
|
||||
|
|
|
@ -71,7 +71,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
public static void setupThreadPool() {
|
||||
int writeThreadPoolSize = randomIntBetween(1, 2);
|
||||
int writeThreadPoolQueueSize = randomIntBetween(1, 2);
|
||||
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
|
||||
threadPool = new TestThreadPool("IndexShardOperationPermitsTests",
|
||||
Settings.builder()
|
||||
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", writeThreadPoolSize)
|
||||
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", writeThreadPoolQueueSize)
|
||||
|
@ -100,7 +100,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
assertThat(permits.getActiveOperationsCount(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
|
||||
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException {
|
||||
int numThreads = 10;
|
||||
|
||||
class DummyException extends RuntimeException {}
|
||||
|
@ -187,7 +187,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
future.get().close();
|
||||
}
|
||||
|
||||
public void testOperationsIfClosed() throws ExecutionException, InterruptedException {
|
||||
public void testOperationsIfClosed() {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
permits.close();
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
|
@ -195,10 +195,12 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class));
|
||||
}
|
||||
|
||||
public void testBlockIfClosed() throws ExecutionException, InterruptedException {
|
||||
public void testBlockIfClosed() {
|
||||
permits.close();
|
||||
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
|
||||
() -> { throw new IllegalArgumentException("fake error"); }));
|
||||
expectThrows(IndexShardClosedException.class, () -> permits.asyncBlockOperations(randomInt(10), TimeUnit.MINUTES,
|
||||
() -> { throw new IllegalArgumentException("fake error"); }, e -> { throw new AssertionError(e); }));
|
||||
}
|
||||
|
||||
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
|
@ -210,6 +212,36 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
future.get(1, TimeUnit.HOURS).close();
|
||||
}
|
||||
|
||||
public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
final CountDownLatch blockAcquired = new CountDownLatch(1);
|
||||
final CountDownLatch releaseBlock = new CountDownLatch(1);
|
||||
final AtomicBoolean blocked = new AtomicBoolean();
|
||||
try (Releasable ignored = blockAndWait()) {
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
|
||||
permits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
blocked.set(true);
|
||||
blockAcquired.countDown();
|
||||
releaseBlock.await();
|
||||
},
|
||||
e -> {
|
||||
throw new RuntimeException(e);
|
||||
});
|
||||
assertFalse(blocked.get());
|
||||
assertFalse(future.isDone());
|
||||
}
|
||||
blockAcquired.await();
|
||||
assertTrue(blocked.get());
|
||||
assertFalse(future.isDone());
|
||||
releaseBlock.countDown();
|
||||
|
||||
future.get(1, TimeUnit.HOURS).close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the ThreadContext is restored when a operation is executed after it has been delayed due to a block
|
||||
*/
|
||||
|
|
|
@ -297,7 +297,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
// expected
|
||||
}
|
||||
try {
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
|
||||
ThreadPool.Names.WRITE, "");
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
|
@ -308,7 +308,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException {
|
||||
IndexShard indexShard = newShard(false);
|
||||
expectThrows(IndexShardNotStartedException.class, () ->
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.WRITE, ""));
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
@ -331,7 +331,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
indexShard.getPrimaryTerm(),
|
||||
indexShard.getPendingPrimaryTerm(),
|
||||
indexShard.getGlobalCheckpoint(),
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
|
@ -418,16 +418,13 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* This test makes sure that people can use the shard routing entry to check whether a shard was already promoted to
|
||||
* a primary. Concretely this means, that when we publish the routing entry via {@link IndexShard#routingEntry()} the following
|
||||
* should have happened
|
||||
* 1) Internal state (ala ReplicationTracker) have been updated
|
||||
* 2) Primary term is set to the new term
|
||||
* This test makes sure that people can use the shard routing entry + take an operation permit to check whether a shard was already
|
||||
* promoted to a primary.
|
||||
*/
|
||||
public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final IndexShard indexShard = newShard(false);
|
||||
recoveryEmptyReplica(indexShard, randomBoolean());
|
||||
final long promotedTerm = indexShard.getPrimaryTerm() + 1;
|
||||
final long promotedTerm = indexShard.getPendingPrimaryTerm() + 1;
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
final AtomicBoolean stop = new AtomicBoolean();
|
||||
final Thread thread = new Thread(() -> {
|
||||
|
@ -438,10 +435,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
}
|
||||
while(stop.get() == false) {
|
||||
if (indexShard.routingEntry().primary()) {
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(promotedTerm));
|
||||
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(promotedTerm));
|
||||
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
|
||||
indexShard.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, "bla");
|
||||
try (Releasable ignored = permitAcquiredFuture.actionGet()) {
|
||||
assertThat(indexShard.getReplicationGroup(), notNullValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
|
||||
|
@ -504,7 +505,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
// promote the replica
|
||||
final ShardRouting replicaRouting = indexShard.routingEntry();
|
||||
final long newPrimaryTerm = indexShard.getPrimaryTerm() + between(1, 10000);
|
||||
final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 10000);
|
||||
final ShardRouting primaryRouting =
|
||||
newShardRouting(
|
||||
replicaRouting.shardId(),
|
||||
|
@ -558,7 +559,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ShardRouting replicaRouting = indexShard.routingEntry();
|
||||
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
|
||||
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
|
||||
final long newPrimaryTerm = indexShard.getPrimaryTerm() + between(1, 1000);
|
||||
final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000);
|
||||
indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {
|
||||
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
|
||||
}, 0L,
|
||||
|
@ -568,11 +569,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
} else {
|
||||
indexShard = newStartedShard(true);
|
||||
}
|
||||
final long primaryTerm = indexShard.getPrimaryTerm();
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
if (indexShard.routingEntry().isRelocationTarget() == false) {
|
||||
try {
|
||||
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.WRITE, "");
|
||||
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
|
||||
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), permitAcquiredFuture,
|
||||
ThreadPool.Names.WRITE, "");
|
||||
permitAcquiredFuture.actionGet();
|
||||
fail("shard shouldn't accept operations as replica");
|
||||
} catch (IllegalStateException ignored) {
|
||||
|
||||
|
@ -650,7 +654,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
|
||||
}
|
||||
|
||||
final long primaryTerm = indexShard.getPrimaryTerm();
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
final long translogGen = engineClosed ? -1 : getTranslog(indexShard).getGeneration().translogFileGeneration;
|
||||
|
||||
final Releasable operation1;
|
||||
|
@ -728,7 +732,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ActionListener<Releasable> listener = new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
|
||||
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm));
|
||||
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
|
||||
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
|
||||
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
|
||||
|
@ -765,7 +769,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
barrier.await();
|
||||
if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) {
|
||||
barrier.await();
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
|
||||
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(primaryTerm));
|
||||
assertFalse(onResponse.get());
|
||||
assertThat(onFailure.get(), instanceOf(IndexShardNotStartedException.class));
|
||||
Releasables.close(operation1);
|
||||
|
@ -774,18 +778,19 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
// our operation should be blocked until the previous operations complete
|
||||
assertFalse(onResponse.get());
|
||||
assertNull(onFailure.get());
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
|
||||
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm));
|
||||
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
|
||||
Releasables.close(operation1);
|
||||
// our operation should still be blocked
|
||||
assertFalse(onResponse.get());
|
||||
assertNull(onFailure.get());
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
|
||||
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm));
|
||||
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
|
||||
Releasables.close(operation2);
|
||||
barrier.await();
|
||||
// now lock acquisition should have succeeded
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
|
||||
assertThat(indexShard.operationPrimaryTerm, equalTo(newPrimaryTerm));
|
||||
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm));
|
||||
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
|
||||
if (engineClosed) {
|
||||
assertFalse(onResponse.get());
|
||||
|
@ -884,7 +889,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
indexShard.getPrimaryTerm() + 1,
|
||||
indexShard.getPendingPrimaryTerm() + 1,
|
||||
globalCheckpoint,
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
|
@ -906,7 +911,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final CountDownLatch resyncLatch = new CountDownLatch(1);
|
||||
indexShard.updateShardState(
|
||||
newRouting,
|
||||
indexShard.getPrimaryTerm() + 1,
|
||||
indexShard.getPendingPrimaryTerm() + 1,
|
||||
(s, r) -> resyncLatch.countDown(),
|
||||
1L,
|
||||
Collections.singleton(newRouting.allocationId().getId()),
|
||||
|
@ -938,7 +943,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
Math.toIntExact(indexShard.getLocalCheckpoint()));
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
indexShard.primaryTerm + 1,
|
||||
indexShard.pendingPrimaryTerm + 1,
|
||||
globalCheckpoint,
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
|
@ -975,7 +980,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final CyclicBarrier barrier = new CyclicBarrier(3);
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
|
||||
final long primaryTerm = indexShard.getPrimaryTerm();
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
final AtomicLong counter = new AtomicLong();
|
||||
final AtomicReference<Exception> onFailure = new AtomicReference<>();
|
||||
|
||||
|
@ -993,7 +998,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
counter.incrementAndGet();
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + increment));
|
||||
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(primaryTerm + increment));
|
||||
latch.countDown();
|
||||
releasable.close();
|
||||
}
|
||||
|
@ -1037,7 +1042,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertThat(counter.get(), equalTo(2L));
|
||||
}
|
||||
|
||||
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + Math.max(firstIncrement, secondIncrement)));
|
||||
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(primaryTerm + Math.max(firstIncrement, secondIncrement)));
|
||||
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
@ -1416,14 +1421,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
recoveryThread.start();
|
||||
latch.await();
|
||||
// recovery can only be finalized after we release the current primaryOperationLock
|
||||
assertTrue(shard.isPrimaryMode());
|
||||
assertFalse(shard.isRelocatedPrimary());
|
||||
}
|
||||
// recovery can be now finalized
|
||||
recoveryThread.join();
|
||||
assertFalse(shard.isPrimaryMode());
|
||||
assertTrue(shard.isRelocatedPrimary());
|
||||
try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
|
||||
// lock can again be acquired
|
||||
assertFalse(shard.isPrimaryMode());
|
||||
assertTrue(shard.isRelocatedPrimary());
|
||||
}
|
||||
|
||||
closeShards(shard);
|
||||
|
@ -1465,7 +1470,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
public void testStressRelocated() throws Exception {
|
||||
final IndexShard shard = newStartedShard(true);
|
||||
assertTrue(shard.isPrimaryMode());
|
||||
assertFalse(shard.isRelocatedPrimary());
|
||||
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
|
||||
final int numThreads = randomIntBetween(2, 4);
|
||||
Thread[] indexThreads = new Thread[numThreads];
|
||||
|
@ -1501,14 +1506,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertThat(relocated.get(), equalTo(false));
|
||||
assertThat(shard.getActiveOperationsCount(), greaterThan(0));
|
||||
// ensure we only transition after pending operations completed
|
||||
assertTrue(shard.isPrimaryMode());
|
||||
assertFalse(shard.isRelocatedPrimary());
|
||||
// complete pending operations
|
||||
barrier.await();
|
||||
// complete recovery/relocation
|
||||
recoveryThread.join();
|
||||
// ensure relocated successfully once pending operations are done
|
||||
assertThat(relocated.get(), equalTo(true));
|
||||
assertFalse(shard.isPrimaryMode());
|
||||
assertTrue(shard.isRelocatedPrimary());
|
||||
assertThat(shard.getActiveOperationsCount(), equalTo(0));
|
||||
|
||||
for (Thread indexThread : indexThreads) {
|
||||
|
@ -1572,7 +1577,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
cyclicBarrier.await();
|
||||
relocationThread.join();
|
||||
cancellingThread.join();
|
||||
if (shard.isPrimaryMode() == false) {
|
||||
if (shard.isRelocatedPrimary()) {
|
||||
logger.debug("shard was relocated successfully");
|
||||
assertThat(cancellingException.get(), instanceOf(IllegalIndexShardStateException.class));
|
||||
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(true));
|
||||
|
@ -1719,7 +1724,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.opType() == Translog.Operation.Type.NO_OP) {
|
||||
numNoops++;
|
||||
assertEquals(newShard.getPrimaryTerm(), operation.primaryTerm());
|
||||
assertEquals(newShard.getPendingPrimaryTerm(), operation.primaryTerm());
|
||||
assertEquals(0, operation.seqNo());
|
||||
}
|
||||
}
|
||||
|
@ -1826,7 +1831,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
flushShard(shard);
|
||||
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
|
||||
// Simulate resync (without rollback): Noop #1, index #2
|
||||
acquireReplicaOperationPermitBlockingly(shard, shard.primaryTerm + 1);
|
||||
acquireReplicaOperationPermitBlockingly(shard, shard.pendingPrimaryTerm + 1);
|
||||
shard.markSeqNoAsNoop(1, "test");
|
||||
shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON));
|
||||
|
@ -1837,7 +1842,8 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
IndexShard newShard = reinitShard(shard,
|
||||
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
|
||||
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
|
||||
newShard.primaryTerm++;
|
||||
newShard.pendingPrimaryTerm++;
|
||||
newShard.operationPrimaryTerm++;
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
|
||||
assertTrue(newShard.recoverFromStore());
|
||||
|
@ -1852,7 +1858,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
|
||||
IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting);
|
||||
shard.relocated(primaryContext -> {});
|
||||
assertFalse(shard.isPrimaryMode());
|
||||
assertTrue(shard.isRelocatedPrimary());
|
||||
try {
|
||||
IndexShardTestCase.updateRoutingEntry(shard, origRouting);
|
||||
fail("Expected IndexShardRelocatedException");
|
||||
|
@ -2160,11 +2166,11 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
int numCorruptEntries = 0;
|
||||
for (int i = 0; i < numTotalEntries; i++) {
|
||||
if (randomBoolean()) {
|
||||
operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1,
|
||||
operations.add(new Translog.Index("_doc", "1", 0, primary.getPendingPrimaryTerm(), 1,
|
||||
"{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1));
|
||||
} else {
|
||||
// corrupt entry
|
||||
operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1,
|
||||
operations.add(new Translog.Index("_doc", "2", 1, primary.getPendingPrimaryTerm(), 1,
|
||||
"{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1));
|
||||
numCorruptEntries++;
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
ParsedDocument doc = InternalEngineTests.createParsedDoc("1", null);
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong());
|
||||
Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc);
|
||||
compositeListener.postDelete(randomShardId, delete, new Engine.DeleteResult(1, SequenceNumbers.UNASSIGNED_SEQ_NO, true));
|
||||
compositeListener.postDelete(randomShardId, delete, new Engine.DeleteResult(1, 0, SequenceNumbers.UNASSIGNED_SEQ_NO, true));
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(0, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
@ -172,7 +172,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(2, postDeleteException.get());
|
||||
|
||||
compositeListener.postIndex(randomShardId, index, new Engine.IndexResult(0, SequenceNumbers.UNASSIGNED_SEQ_NO, false));
|
||||
compositeListener.postIndex(randomShardId, index, new Engine.IndexResult(0, 0, SequenceNumbers.UNASSIGNED_SEQ_NO, false));
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(2, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
|
|
@ -83,7 +83,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
|||
boolean syncNeeded = numDocs > 0;
|
||||
|
||||
String allocationId = shard.routingEntry().allocationId().getId();
|
||||
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
|
||||
shard.updateShardState(shard.routingEntry(), shard.getPendingPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
|
||||
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
|
||||
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
|
||||
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
|
||||
|
@ -142,7 +142,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
|||
}
|
||||
|
||||
String allocationId = shard.routingEntry().allocationId().getId();
|
||||
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
|
||||
shard.updateShardState(shard.routingEntry(), shard.getPendingPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
|
||||
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
|
||||
|
||||
CountDownLatch syncCalledLatch = new CountDownLatch(1);
|
||||
|
|
|
@ -2669,7 +2669,7 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
Engine.Index eIndex = new Engine.Index(newUid(doc), doc, randomSeqNum, randomPrimaryTerm,
|
||||
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false);
|
||||
Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomSeqNum, true);
|
||||
Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomPrimaryTerm, randomSeqNum, true);
|
||||
Translog.Index index = new Translog.Index(eIndex, eIndexResult);
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
|
@ -2680,7 +2680,7 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
|
||||
2, VersionType.INTERNAL, Origin.PRIMARY, 0);
|
||||
Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomSeqNum, true);
|
||||
Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomPrimaryTerm, randomSeqNum, true);
|
||||
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
|
||||
|
||||
out = new BytesStreamOutput();
|
||||
|
|
|
@ -179,12 +179,12 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
final int initialNumberOfDocs = randomIntBetween(16, 64);
|
||||
for (int i = 0; i < initialNumberOfDocs; i++) {
|
||||
final Engine.Index index = getIndex(Integer.toString(i));
|
||||
operations.add(new Translog.Index(index, new Engine.IndexResult(1, SequenceNumbers.UNASSIGNED_SEQ_NO, true)));
|
||||
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true)));
|
||||
}
|
||||
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
|
||||
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
|
||||
final Engine.Index index = getIndex(Integer.toString(i));
|
||||
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
|
||||
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
|
||||
}
|
||||
operations.add(null);
|
||||
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
|
||||
|
@ -395,7 +395,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
final IndexShard shard = mock(IndexShard.class);
|
||||
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
|
||||
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
|
||||
when(shard.isPrimaryMode()).thenReturn(false);
|
||||
when(shard.isRelocatedPrimary()).thenReturn(true);
|
||||
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
|
||||
doAnswer(invocation -> {
|
||||
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
|
||||
|
@ -444,7 +444,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
final CancellableThreads cancellableThreads = new CancellableThreads();
|
||||
final IndexShard shard = mock(IndexShard.class);
|
||||
final AtomicBoolean freed = new AtomicBoolean(true);
|
||||
when(shard.isPrimaryMode()).thenReturn(true);
|
||||
when(shard.isRelocatedPrimary()).thenReturn(false);
|
||||
doAnswer(invocation -> {
|
||||
freed.set(false);
|
||||
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> freed.set(true));
|
||||
|
|
|
@ -201,7 +201,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
|||
if (randomBoolean()) {
|
||||
// create a new translog
|
||||
translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs,
|
||||
replica.shardId(), replica.getPrimaryTerm());
|
||||
replica.shardId(), replica.getPendingPrimaryTerm());
|
||||
translogGenToUse = 1;
|
||||
} else {
|
||||
translogUUIDtoUse = translogGeneration.translogUUID;
|
||||
|
|
|
@ -84,6 +84,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
@ -233,7 +234,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
activeIds.add(primary.routingEntry().allocationId().getId());
|
||||
ShardRouting startedRoutingEntry = ShardRoutingHelper.moveToStarted(primary.routingEntry());
|
||||
IndexShardRoutingTable routingTable = routingTable(shr -> shr == primary.routingEntry() ? startedRoutingEntry : shr);
|
||||
primary.updateShardState(startedRoutingEntry, primary.getPrimaryTerm(), null,
|
||||
primary.updateShardState(startedRoutingEntry, primary.getPendingPrimaryTerm(), null,
|
||||
currentClusterStateVersion.incrementAndGet(), activeIds, routingTable, Collections.emptySet());
|
||||
for (final IndexShard replica : replicas) {
|
||||
recoverReplica(replica);
|
||||
|
@ -279,20 +280,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
/**
|
||||
* promotes the specific replica as the new primary
|
||||
*/
|
||||
public synchronized Future<PrimaryReplicaSyncer.ResyncTask> promoteReplicaToPrimary(IndexShard replica) throws IOException {
|
||||
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
|
||||
IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
|
||||
indexMetaData = newMetaData.build();
|
||||
assertTrue(replicas.remove(replica));
|
||||
closeShards(primary);
|
||||
primary = replica;
|
||||
assert primary.routingEntry().active() : "only active replicas can be promoted to primary: " + primary.routingEntry();
|
||||
public Future<PrimaryReplicaSyncer.ResyncTask> promoteReplicaToPrimary(IndexShard replica) throws IOException {
|
||||
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
|
||||
ShardRouting primaryRouting = replica.routingEntry().moveActiveReplicaToPrimary();
|
||||
IndexShardRoutingTable routingTable = routingTable(shr -> shr == replica.routingEntry() ? primaryRouting : shr);
|
||||
|
||||
primary.updateShardState(primaryRouting,
|
||||
newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
|
||||
promoteReplicaToPrimary(replica,
|
||||
(shard, listener) -> primaryReplicaSyncer.resync(shard,
|
||||
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
|
||||
@Override
|
||||
public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
|
||||
|
@ -305,11 +296,27 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
listener.onFailure(e);
|
||||
fut.onFailure(e);
|
||||
}
|
||||
}), currentClusterStateVersion.incrementAndGet(), activeIds(), routingTable, Collections.emptySet());
|
||||
|
||||
}));
|
||||
return fut;
|
||||
}
|
||||
|
||||
public synchronized void promoteReplicaToPrimary(IndexShard replica,
|
||||
BiConsumer<IndexShard, ActionListener<PrimaryReplicaSyncer.ResyncTask>> primaryReplicaSyncer)
|
||||
throws IOException {
|
||||
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
|
||||
IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
|
||||
indexMetaData = newMetaData.build();
|
||||
assertTrue(replicas.remove(replica));
|
||||
closeShards(primary);
|
||||
primary = replica;
|
||||
assert primary.routingEntry().active() : "only active replicas can be promoted to primary: " + primary.routingEntry();
|
||||
ShardRouting primaryRouting = replica.routingEntry().moveActiveReplicaToPrimary();
|
||||
IndexShardRoutingTable routingTable = routingTable(shr -> shr == replica.routingEntry() ? primaryRouting : shr);
|
||||
|
||||
primary.updateShardState(primaryRouting, newTerm, primaryReplicaSyncer, currentClusterStateVersion.incrementAndGet(),
|
||||
activeIds(), routingTable, Collections.emptySet());
|
||||
}
|
||||
|
||||
private synchronized Set<String> activeIds() {
|
||||
return shardRoutings().stream()
|
||||
.filter(ShardRouting::active).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet());
|
||||
|
@ -425,7 +432,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
private void updateAllocationIDsOnPrimary() throws IOException {
|
||||
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
|
||||
currentClusterStateVersion.incrementAndGet(),
|
||||
activeIds(), routingTable(Function.identity()), Collections.emptySet());
|
||||
}
|
||||
}
|
||||
|
@ -527,7 +535,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
IndexShard replica = replicationGroup.replicas.stream()
|
||||
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
|
||||
replica.acquireReplicaOperationPermit(
|
||||
replicationGroup.primary.getPrimaryTerm(),
|
||||
replicationGroup.primary.getPendingPrimaryTerm(),
|
||||
globalCheckpoint,
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
|
@ -605,7 +613,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
@Override
|
||||
protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
|
||||
executeShardBulkOnReplica(request, replica, getPrimaryShard().getPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint());
|
||||
executeShardBulkOnReplica(request, replica, getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -664,14 +672,18 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
* indexes the given requests on the supplied replica shard
|
||||
*/
|
||||
void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
|
||||
executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
|
||||
indexOnReplica(request, group, replica, group.primary.getPendingPrimaryTerm());
|
||||
}
|
||||
|
||||
void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica, long term) throws Exception {
|
||||
executeShardBulkOnReplica(request, replica, term, group.primary.getGlobalCheckpoint());
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the delete request on the given replica shard.
|
||||
*/
|
||||
void deleteOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
|
||||
executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
|
||||
executeShardBulkOnReplica(request, replica, group.primary.getPendingPrimaryTerm(), group.primary.getGlobalCheckpoint());
|
||||
}
|
||||
|
||||
class GlobalCheckpointSync extends ReplicationAction<
|
||||
|
|
|
@ -425,7 +425,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
IndexShardRoutingTable newRoutingTable = new IndexShardRoutingTable.Builder(shardRouting.shardId())
|
||||
.addShard(shardRouting)
|
||||
.build();
|
||||
shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
|
||||
shard.updateShardState(shardRouting, shard.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
|
||||
inSyncIds, newRoutingTable, Collections.emptySet());
|
||||
}
|
||||
|
||||
|
@ -514,8 +514,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
request,
|
||||
(int) ByteSizeUnit.MB.toBytes(1),
|
||||
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build());
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
|
||||
inSyncIds, routingTable, Collections.emptySet());
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
|
||||
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet());
|
||||
recovery.recoverToTarget();
|
||||
recoveryTarget.markAsDone();
|
||||
}
|
||||
|
@ -536,9 +536,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
Set<String> inSyncIdsWithReplica = new HashSet<>(inSyncIds);
|
||||
inSyncIdsWithReplica.add(replica.routingEntry().allocationId().getId());
|
||||
// update both primary and replica shard state
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
|
||||
inSyncIdsWithReplica, newRoutingTable, Collections.emptySet());
|
||||
replica.updateShardState(replica.routingEntry().moveToStarted(), replica.getPrimaryTerm(), null,
|
||||
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
|
||||
currentClusterStateVersion.incrementAndGet(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet());
|
||||
replica.updateShardState(replica.routingEntry().moveToStarted(), replica.getPendingPrimaryTerm(), null,
|
||||
currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet());
|
||||
}
|
||||
|
||||
|
@ -560,7 +560,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
.removeShard(replica.routingEntry())
|
||||
.addShard(routingEntry)
|
||||
.build();
|
||||
replica.updateShardState(routingEntry, replica.getPrimaryTerm() + 1,
|
||||
replica.updateShardState(routingEntry, replica.getPendingPrimaryTerm() + 1,
|
||||
(is, listener) ->
|
||||
listener.onResponse(new PrimaryReplicaSyncer.ResyncTask(1, "type", "action", "desc", null, Collections.emptyMap())),
|
||||
currentClusterStateVersion.incrementAndGet(),
|
||||
|
|
Loading…
Reference in New Issue