Remove RELOCATED index shard state (#29246)

as this information is already covered by ReplicationTracker.primaryMode.
This commit is contained in:
Yannick Welsch 2018-03-28 12:25:46 +02:00 committed by GitHub
parent ea8e3661d0
commit cacf759213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 82 additions and 78 deletions

View File

@ -1001,7 +1001,7 @@ public abstract class TransportReplicationAction<
} }
public boolean isRelocated() { public boolean isRelocated() {
return indexShard.state() == IndexShardState.RELOCATED; return indexShard.isPrimaryMode() == false;
} }
@Override @Override

View File

@ -731,7 +731,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
continue; continue;
case POST_RECOVERY: case POST_RECOVERY:
case STARTED: case STARTED:
case RELOCATED:
try { try {
shard.trimTranslog(); shard.trimTranslog();
} catch (IndexShardClosedException | AlreadyClosedException ex) { } catch (IndexShardClosedException | AlreadyClosedException ex) {
@ -751,7 +750,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
case CLOSED: case CLOSED:
case CREATED: case CREATED:
case RECOVERING: case RECOVERING:
case RELOCATED:
continue; continue;
case POST_RECOVERY: case POST_RECOVERY:
assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active"; assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active";

View File

@ -84,7 +84,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* to replica mode (using {@link #completeRelocationHandoff}), as the relocation target will be in charge of the global checkpoint * to replica mode (using {@link #completeRelocationHandoff}), as the relocation target will be in charge of the global checkpoint
* computation from that point on. * computation from that point on.
*/ */
boolean primaryMode; volatile boolean primaryMode;
/** /**
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the * and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
@ -252,6 +252,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
return globalCheckpoints; return globalCheckpoints;
} }
/**
* Returns whether the replication tracker is in primary mode, i.e., whether the current shard is acting as primary from the point of
* view of replication.
*/
public boolean isPrimaryMode() {
return primaryMode;
}
/** /**
* Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication * Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication
* as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however. * as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however.

View File

@ -217,15 +217,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final IndexShardOperationPermits indexShardOperationPermits; private final IndexShardOperationPermits indexShardOperationPermits;
private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started) // for primaries, we only allow to write when actually started (so the cluster has decided we started)
// in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be // in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be
// in state RECOVERING or POST_RECOVERY. After a primary has been marked as RELOCATED, we only allow writes to the relocation target // in state RECOVERING or POST_RECOVERY.
// which can be either in POST_RECOVERY or already STARTED (this prevents writing concurrently to two primaries). // for replicas, replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent
public static final EnumSet<IndexShardState> writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED);
// replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent
// a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source // a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source
private static final EnumSet<IndexShardState> writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private static final EnumSet<IndexShardState> writeAllowedStates = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED);
private final IndexSearcherWrapper searcherWrapper; private final IndexSearcherWrapper searcherWrapper;
@ -412,15 +410,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
} else if (state == IndexShardState.RELOCATED && } else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery // if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two // failure / cancellation). The reason is that at the moment we cannot safely reactivate primary mode without risking two
// active primaries. // active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
} }
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.CLOSED :
state == IndexShardState.CLOSED :
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
persistMetadata(path, indexSettings, newRouting, currentRouting, logger); persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
final CountDownLatch shardStateUpdated = new CountDownLatch(1); final CountDownLatch shardStateUpdated = new CountDownLatch(1);
@ -538,9 +535,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (state == IndexShardState.STARTED) { if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId); throw new IndexShardStartedException(shardId);
} }
if (state == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shardId);
}
if (state == IndexShardState.RECOVERING) { if (state == IndexShardState.RECOVERING) {
throw new IndexShardRecoveringException(shardId); throw new IndexShardRecoveringException(shardId);
} }
@ -558,13 +552,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
* {@link Runnable} is executed after all operations are successfully blocked. * {@link Runnable} is executed after all operations are successfully blocked.
* *
* @param reason the reason for the relocation
* @param consumer a {@link Runnable} that is executed after operations are blocked * @param consumer a {@link Runnable} that is executed after operations are blocked
* @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
* @throws InterruptedException if blocking operations is interrupted * @throws InterruptedException if blocking operations is interrupted
*/ */
public void relocated( public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
final String reason, final Consumer<ReplicationTracker.PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try { try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
@ -581,9 +573,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
consumer.accept(primaryContext); consumer.accept(primaryContext);
synchronized (mutex) { synchronized (mutex) {
verifyRelocatingState(); verifyRelocatingState();
changeState(IndexShardState.RELOCATED, reason); replicationTracker.completeRelocationHandoff(); // make changes to primaryMode flag only under mutex
} }
replicationTracker.completeRelocationHandoff();
} catch (final Exception e) { } catch (final Exception e) {
try { try {
replicationTracker.abortRelocationHandoff(); replicationTracker.abortRelocationHandoff();
@ -1083,7 +1074,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException { public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
final IndexShardState state = this.state; // one time volatile read final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
return getEngine().acquireLastIndexCommit(flushFirst); return getEngine().acquireLastIndexCommit(flushFirst);
} else { } else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
@ -1097,7 +1088,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexShardState state = this.state; // one time volatile read final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
return getEngine().acquireSafeIndexCommit(); return getEngine().acquireSafeIndexCommit();
} else { } else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
@ -1202,9 +1193,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (state == IndexShardState.STARTED) { if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId); throw new IndexShardStartedException(shardId);
} }
if (state == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shardId);
}
// we need to refresh again to expose all operations that were index until now. Otherwise // we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately // we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener. // responded to in addRefreshListener.
@ -1408,7 +1396,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public boolean ignoreRecoveryAttempt() { public boolean ignoreRecoveryAttempt() {
IndexShardState state = state(); // one time volatile read IndexShardState state = state(); // one time volatile read
return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED || return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED ||
state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED; state == IndexShardState.CLOSED;
} }
public void readAllowed() throws IllegalIndexShardStateException { public void readAllowed() throws IllegalIndexShardStateException {
@ -1426,20 +1414,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException { private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
if (origin == Engine.Operation.Origin.PRIMARY) { if (origin.isRecovery()) {
verifyPrimary();
if (writeAllowedStatesForPrimary.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForPrimary + ", origin [" + origin + "]");
}
} else if (origin.isRecovery()) {
if (state != IndexShardState.RECOVERING) { if (state != IndexShardState.RECOVERING) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when recovering, origin [" + origin + "]"); throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when recovering, origin [" + origin + "]");
} }
} else { } else {
assert origin == Engine.Operation.Origin.REPLICA; if (origin == Engine.Operation.Origin.PRIMARY) {
verifyReplicationTarget(); verifyPrimary();
if (writeAllowedStatesForReplica.contains(state) == false) { } else {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForReplica + ", origin [" + origin + "]"); assert origin == Engine.Operation.Origin.REPLICA;
verifyReplicationTarget();
}
if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");
} }
} }
} }
@ -1452,7 +1439,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void verifyReplicationTarget() { private void verifyReplicationTarget() {
final IndexShardState state = state(); final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { if (shardRouting.primary() && shardRouting.active() && replicationTracker.isPrimaryMode()) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " + throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
"relocation hand off, state is [" + state + "]"); "relocation hand off, state is [" + state + "]");
@ -1476,7 +1463,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected final void verifyActive() throws IllegalIndexShardStateException { protected final void verifyActive() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { if (state != IndexShardState.STARTED) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard is active"); throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard is active");
} }
} }
@ -1778,7 +1765,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void maybeSyncGlobalCheckpoint(final String reason) { public void maybeSyncGlobalCheckpoint(final String reason) {
verifyPrimary(); verifyPrimary();
verifyNotClosed(); verifyNotClosed();
if (state == IndexShardState.RELOCATED) { if (replicationTracker.isPrimaryMode() == false) {
return; return;
} }
// only sync if there are not operations in flight // only sync if there are not operations in flight
@ -1831,7 +1818,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move * while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move
* to recovery finalization, or even finished recovery before the update arrives here. * to recovery finalization, or even finished recovery before the update arrives here.
*/ */
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED && state() != IndexShardState.RELOCATED : assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED :
"supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " +
"that is higher than its local checkpoint [" + localCheckpoint + "]"; "that is higher than its local checkpoint [" + localCheckpoint + "]";
return; return;
@ -1850,7 +1837,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) && assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
getEngine().getLocalCheckpointTracker().getCheckpoint() == getEngine().getLocalCheckpointTracker().getCheckpoint() ==
primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
replicationTracker.activateWithPrimaryContext(primaryContext); synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
}
} }
/** /**
@ -2067,6 +2056,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
} }
/**
* Returns whether the shard is in primary mode, i.e., in charge of replicating changes (see {@link ReplicationTracker}).
*/
public boolean isPrimaryMode() {
return replicationTracker.isPrimaryMode();
}
class ShardEventListener implements Engine.EventListener { class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
@ -2205,8 +2201,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// means that the master will fail this shard as all initializing shards are failed when a primary is selected // means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY && if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED && shardState != IndexShardState.STARTED) {
shardState != IndexShardState.RELOCATED) {
throw new IndexShardNotStartedException(shardId, shardState); throw new IndexShardNotStartedException(shardId, shardState);
} }
try { try {

View File

@ -30,7 +30,7 @@ public class IndexShardRelocatedException extends IllegalIndexShardStateExceptio
} }
public IndexShardRelocatedException(ShardId shardId, String reason) { public IndexShardRelocatedException(ShardId shardId, String reason) {
super(shardId, IndexShardState.RELOCATED, reason); super(shardId, IndexShardState.STARTED, reason);
} }
public IndexShardRelocatedException(StreamInput in) throws IOException{ public IndexShardRelocatedException(StreamInput in) throws IOException{

View File

@ -25,16 +25,18 @@ public enum IndexShardState {
RECOVERING((byte) 1), RECOVERING((byte) 1),
POST_RECOVERY((byte) 2), POST_RECOVERY((byte) 2),
STARTED((byte) 3), STARTED((byte) 3),
RELOCATED((byte) 4), // previously, 4 was the RELOCATED state
CLOSED((byte) 5); CLOSED((byte) 5);
private static final IndexShardState[] IDS = new IndexShardState[IndexShardState.values().length]; private static final IndexShardState[] IDS = new IndexShardState[IndexShardState.values().length + 1]; // +1 for RELOCATED state
static { static {
for (IndexShardState state : IndexShardState.values()) { for (IndexShardState state : IndexShardState.values()) {
assert state.id() < IDS.length && state.id() >= 0; assert state.id() < IDS.length && state.id() >= 0;
IDS[state.id()] = state; IDS[state.id()] = state;
} }
assert IDS[4] == null;
IDS[4] = STARTED; // for backward compatibility reasons (this was the RELOCATED state)
} }
private final byte id; private final byte id;

View File

@ -89,7 +89,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
private final Cancellable scheduler; private final Cancellable scheduler;
private static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of( private static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED);
private final ShardsIndicesStatusChecker statusChecker; private final ShardsIndicesStatusChecker statusChecker;

View File

@ -236,8 +236,8 @@ public class RecoverySourceHandler {
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = onAcquired.actionGet()) { try (Releasable ignored = onAcquired.actionGet()) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will change to RELOCATED only when it holds all operation permits, see IndexShard.relocated() // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
if (shard.state() == IndexShardState.RELOCATED) { if (shard.isPrimaryMode() == false) {
throw new IndexShardRelocatedException(shard.shardId()); throw new IndexShardRelocatedException(shard.shardId());
} }
runnable.run(); runnable.run();
@ -501,9 +501,9 @@ public class RecoverySourceHandler {
if (request.isPrimaryRelocation()) { if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off"); logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext)); cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/* /*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and * if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}). * target are failed (see {@link IndexShard#updateRoutingEntry}).
*/ */
} }

View File

@ -209,7 +209,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
} }
RecoveryState.Stage stage = indexShard.recoveryState().getStage(); RecoveryState.Stage stage = indexShard.recoveryState().getStage();
if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) { if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) {
// once primary relocation has moved past the finalization step, the relocation source can be moved to RELOCATED state // once primary relocation has moved past the finalization step, the relocation source can put the target into primary mode
// and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this // and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this
// state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing // state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing
// documents indexed and acknowledged before the reset. // documents indexed and acknowledged before the reset.

View File

@ -76,7 +76,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS), Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS),
Property.NodeScope); Property.NodeScope);
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists"; public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED);
private final IndicesService indicesService; private final IndicesService indicesService;
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportService transportService; private final TransportService transportService;

View File

@ -685,6 +685,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final IndexShard shard = mock(IndexShard.class); final IndexShard shard = mock(IndexShard.class);
when(shard.getPrimaryTerm()).thenReturn(primaryTerm); when(shard.getPrimaryTerm()).thenReturn(primaryTerm);
when(shard.routingEntry()).thenReturn(routingEntry); when(shard.routingEntry()).thenReturn(routingEntry);
when(shard.isPrimaryMode()).thenReturn(true);
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) : Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) :
clusterService.state().metaData().index(index).inSyncAllocationIds(0); clusterService.state().metaData().index(index).inSyncAllocationIds(0);
@ -1217,7 +1218,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
return routing; return routing;
}); });
when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); when(indexShard.isPrimaryMode()).thenAnswer(invocationOnMock -> isRelocated.get() == false);
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
when(indexShard.getPrimaryTerm()).thenAnswer(i -> when(indexShard.getPrimaryTerm()).thenAnswer(i ->
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));

View File

@ -472,7 +472,7 @@ public class TransportWriteActionTests extends ESTestCase {
} }
return routing; return routing;
}); });
when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); when(indexShard.isPrimaryMode()).thenAnswer(invocationOnMock -> isRelocated.get() == false);
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
when(indexShard.getPrimaryTerm()).thenAnswer(i -> when(indexShard.getPrimaryTerm()).thenAnswer(i ->
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));

View File

@ -641,7 +641,7 @@ public class IndexShardTests extends IndexShardTestCase {
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
IndexShardTestCase.updateRoutingEntry(indexShard, routing); IndexShardTestCase.updateRoutingEntry(indexShard, routing);
indexShard.relocated("test", primaryContext -> {}); indexShard.relocated(primaryContext -> {});
engineClosed = false; engineClosed = false;
break; break;
} }
@ -1325,7 +1325,7 @@ public class IndexShardTests extends IndexShardTestCase {
Thread recoveryThread = new Thread(() -> { Thread recoveryThread = new Thread(() -> {
latch.countDown(); latch.countDown();
try { try {
shard.relocated("simulated recovery", primaryContext -> {}); shard.relocated(primaryContext -> {});
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -1336,14 +1336,14 @@ public class IndexShardTests extends IndexShardTestCase {
recoveryThread.start(); recoveryThread.start();
latch.await(); latch.await();
// recovery can only be finalized after we release the current primaryOperationLock // recovery can only be finalized after we release the current primaryOperationLock
assertThat(shard.state(), equalTo(IndexShardState.STARTED)); assertTrue(shard.isPrimaryMode());
} }
// recovery can be now finalized // recovery can be now finalized
recoveryThread.join(); recoveryThread.join();
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); assertFalse(shard.isPrimaryMode());
try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
// lock can again be acquired // lock can again be acquired
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); assertFalse(shard.isPrimaryMode());
} }
closeShards(shard); closeShards(shard);
@ -1354,7 +1354,7 @@ public class IndexShardTests extends IndexShardTestCase {
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
Thread recoveryThread = new Thread(() -> { Thread recoveryThread = new Thread(() -> {
try { try {
shard.relocated("simulated recovery", primaryContext -> {}); shard.relocated(primaryContext -> {});
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -1385,6 +1385,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testStressRelocated() throws Exception { public void testStressRelocated() throws Exception {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
assertTrue(shard.isPrimaryMode());
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
final int numThreads = randomIntBetween(2, 4); final int numThreads = randomIntBetween(2, 4);
Thread[] indexThreads = new Thread[numThreads]; Thread[] indexThreads = new Thread[numThreads];
@ -1407,7 +1408,7 @@ public class IndexShardTests extends IndexShardTestCase {
AtomicBoolean relocated = new AtomicBoolean(); AtomicBoolean relocated = new AtomicBoolean();
final Thread recoveryThread = new Thread(() -> { final Thread recoveryThread = new Thread(() -> {
try { try {
shard.relocated("simulated recovery", primaryContext -> {}); shard.relocated(primaryContext -> {});
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -1419,15 +1420,15 @@ public class IndexShardTests extends IndexShardTestCase {
recoveryThread.start(); recoveryThread.start();
assertThat(relocated.get(), equalTo(false)); assertThat(relocated.get(), equalTo(false));
assertThat(shard.getActiveOperationsCount(), greaterThan(0)); assertThat(shard.getActiveOperationsCount(), greaterThan(0));
// ensure we only transition to RELOCATED state after pending operations completed // ensure we only transition after pending operations completed
assertThat(shard.state(), equalTo(IndexShardState.STARTED)); assertTrue(shard.isPrimaryMode());
// complete pending operations // complete pending operations
barrier.await(); barrier.await();
// complete recovery/relocation // complete recovery/relocation
recoveryThread.join(); recoveryThread.join();
// ensure relocated successfully once pending operations are done // ensure relocated successfully once pending operations are done
assertThat(relocated.get(), equalTo(true)); assertThat(relocated.get(), equalTo(true));
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); assertFalse(shard.isPrimaryMode());
assertThat(shard.getActiveOperationsCount(), equalTo(0)); assertThat(shard.getActiveOperationsCount(), equalTo(0));
for (Thread indexThread : indexThreads) { for (Thread indexThread : indexThreads) {
@ -1441,7 +1442,7 @@ public class IndexShardTests extends IndexShardTestCase {
final IndexShard shard = newStartedShard(true); final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting originalRouting = shard.routingEntry();
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.relocated("test", primaryContext -> {}); shard.relocated(primaryContext -> {});
expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting));
closeShards(shard); closeShards(shard);
} }
@ -1451,7 +1452,7 @@ public class IndexShardTests extends IndexShardTestCase {
final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting originalRouting = shard.routingEntry();
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
IndexShardTestCase.updateRoutingEntry(shard, originalRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting);
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {})); expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated(primaryContext -> {}));
closeShards(shard); closeShards(shard);
} }
@ -1470,7 +1471,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
cyclicBarrier.await(); cyclicBarrier.await();
shard.relocated("test", primaryContext -> {}); shard.relocated(primaryContext -> {});
} }
}); });
relocationThread.start(); relocationThread.start();
@ -1491,7 +1492,7 @@ public class IndexShardTests extends IndexShardTestCase {
cyclicBarrier.await(); cyclicBarrier.await();
relocationThread.join(); relocationThread.join();
cancellingThread.join(); cancellingThread.join();
if (shard.state() == IndexShardState.RELOCATED) { if (shard.isPrimaryMode() == false) {
logger.debug("shard was relocated successfully"); logger.debug("shard was relocated successfully");
assertThat(cancellingException.get(), instanceOf(IllegalIndexShardStateException.class)); assertThat(cancellingException.get(), instanceOf(IllegalIndexShardStateException.class));
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(true)); assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(true));
@ -1763,8 +1764,8 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(shard.state(), equalTo(IndexShardState.STARTED)); assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting);
shard.relocated("simulate mark as relocated", primaryContext -> {}); shard.relocated(primaryContext -> {});
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); assertFalse(shard.isPrimaryMode());
try { try {
IndexShardTestCase.updateRoutingEntry(shard, origRouting); IndexShardTestCase.updateRoutingEntry(shard, origRouting);
fail("Expected IndexShardRelocatedException"); fail("Expected IndexShardRelocatedException");

View File

@ -58,7 +58,6 @@ import static org.elasticsearch.index.shard.IndexShardState.CLOSED;
import static org.elasticsearch.index.shard.IndexShardState.CREATED; import static org.elasticsearch.index.shard.IndexShardState.CREATED;
import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY; import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY;
import static org.elasticsearch.index.shard.IndexShardState.RECOVERING; import static org.elasticsearch.index.shard.IndexShardState.RECOVERING;
import static org.elasticsearch.index.shard.IndexShardState.RELOCATED;
import static org.elasticsearch.index.shard.IndexShardState.STARTED; import static org.elasticsearch.index.shard.IndexShardState.STARTED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -186,7 +185,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
ensureGreen(); ensureGreen();
//the 3 relocated shards get closed on the first node //the 3 relocated shards get closed on the first node
assertShardStatesMatch(stateChangeListenerNode1, 3, RELOCATED, CLOSED); assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED);
//the 3 relocated shards get created on the second node //the 3 relocated shards get created on the second node
assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);

View File

@ -394,7 +394,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final IndexShard shard = mock(IndexShard.class); final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.state()).thenReturn(IndexShardState.RELOCATED); when(shard.isPrimaryMode()).thenReturn(false);
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class)); when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
doAnswer(invocation -> { doAnswer(invocation -> {
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {}); ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});

View File

@ -85,7 +85,7 @@ public class MockFSIndexStore extends IndexStore {
} }
private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of( private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY IndexShardState.STARTED, IndexShardState.POST_RECOVERY
); );
private static final class Listener implements IndexEventListener { private static final class Listener implements IndexEventListener {