Introduce internal post recovery state

Introduce a new internal, index shard level, post recovery state, where the shard moves to when its done with recovery. The shard will now move to started only once the cluster state with its respective cluster state level state is started.

This change allow to have more fine grained control over when to allow reads on a shard, resolving potential refresh temporal visibility aspects while indexing and issuign a refresh. By only allowing reads on started shards, and making sure we refresh right before we move to started
This commit is contained in:
Shay Banon 2013-09-24 15:06:37 +02:00
parent 10de3a7ecb
commit 397f442c6d
7 changed files with 70 additions and 51 deletions

View File

@ -179,8 +179,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
lastTotalTranslogOperations = recoveryStatus.translog().currentTranslogOperations();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start("post recovery from gateway");
if (indexShard.state() != IndexShardState.POST_RECOVERY) {
indexShard.postRecovery("post recovery from gateway");
}
// refresh the shard
indexShard.refresh(new Engine.Refresh("post_gateway").force(true));

View File

@ -429,7 +429,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// no translog files, bail
recoveryStatus.start().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
indexShard.start("post recovery from gateway, no translog");
indexShard.postRecovery("post recovery from gateway, no translog");
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
return;

View File

@ -154,7 +154,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
if (translogId == -1) {
// no translog files, bail
indexShard.start("post recovery from gateway, no translog");
indexShard.postRecovery("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
@ -189,7 +189,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) {
// no translog to recovery from, start and bail
// no translog files, bail
indexShard.start("post recovery from gateway, no translog");
indexShard.postRecovery("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());

View File

@ -70,7 +70,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
} catch (IOException e) {
logger.warn("failed to clean store before starting shard", e);
}
indexShard.start("post recovery from gateway");
indexShard.postRecovery("post recovery from gateway");
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());

View File

@ -27,9 +27,10 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
public enum IndexShardState {
CREATED((byte) 0),
RECOVERING((byte) 1),
STARTED((byte) 2),
RELOCATED((byte) 3),
CLOSED((byte) 4);
POST_RECOVERY((byte) 2),
STARTED((byte) 3),
RELOCATED((byte) 4),
CLOSED((byte) 5);
private final byte id;

View File

@ -260,6 +260,14 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
synchronized (mutex) {
if (state != IndexShardState.POST_RECOVERY) {
logger.debug("suspected wrong state when acting on cluster state started state, current state {}", state);
}
logger.debug("state: [{}]->[{}], reason [global state moved to started]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
}
indicesLifecycle.afterIndexShardStarted(this);
}
this.shardRouting = newRouting;
indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting);
@ -285,6 +293,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state == IndexShardState.RECOVERING) {
throw new IndexShardRecoveringException(shardId);
}
if (state == IndexShardState.POST_RECOVERY) {
throw new IndexShardRecoveringException(shardId);
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason);
state = IndexShardState.RECOVERING;
return returnValue;
@ -302,29 +313,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this;
}
public InternalIndexShard start(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
if (state == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shardId);
}
if (Booleans.parseBoolean(checkIndexOnStartup, false)) {
checkIndex(true);
}
engine.start();
startScheduledTasksIfNeeded();
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.STARTED, reason);
state = IndexShardState.STARTED;
}
indicesLifecycle.afterIndexShardStarted(this);
return this;
}
@Override
public IndexShardState state() {
return state;
@ -340,7 +328,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override
public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
writeAllowed();
writeAllowed(create.origin());
create = indexingService.preCreate(create);
if (logger.isTraceEnabled()) {
logger.trace("index {}", create.docs());
@ -361,7 +349,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override
public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
writeAllowed();
writeAllowed(index.origin());
index = indexingService.preIndex(index);
try {
if (logger.isTraceEnabled()) {
@ -386,7 +374,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override
public void delete(Engine.Delete delete) throws ElasticSearchException {
writeAllowed();
writeAllowed(delete.origin());
delete = indexingService.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
@ -417,7 +405,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override
public void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException {
writeAllowed();
writeAllowed(deleteByQuery.origin());
if (logger.isTraceEnabled()) {
logger.trace("delete_by_query [{}]", deleteByQuery.query());
}
@ -578,7 +566,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public <T> T snapshot(Engine.SnapshotHandler<T> snapshotHandler) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.CLOSED) {
if (state != IndexShardState.POST_RECOVERY && state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.CLOSED) {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
return engine.snapshot(snapshotHandler);
@ -620,6 +608,29 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this.checkIndexTook;
}
public InternalIndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
if (state == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shardId);
}
if (Booleans.parseBoolean(checkIndexOnStartup, false)) {
checkIndex(true);
}
engine.start();
startScheduledTasksIfNeeded();
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.POST_RECOVERY, reason);
state = IndexShardState.POST_RECOVERY;
}
return this;
}
/**
* After the store has been recovered, we need to start the engine in order to apply operations
*/
@ -657,11 +668,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
translog.clearUnreferenced();
engine.refresh(new Engine.Refresh("recovery_finalization").force(true));
synchronized (mutex) {
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.POST_RECOVERY);
state = IndexShardState.POST_RECOVERY;
}
startScheduledTasksIfNeeded();
indicesLifecycle.afterIndexShardStarted(this);
engine.enableGcDeletes(true);
}
@ -691,8 +701,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types())
.origin(Engine.Operation.Origin.RECOVERY));
engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()).origin(Engine.Operation.Origin.RECOVERY));
break;
default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");
@ -722,7 +731,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
*/
public boolean ignoreRecoveryAttempt() {
IndexShardState state = state(); // one time volatile read
return state == IndexShardState.RECOVERING || state == IndexShardState.STARTED ||
return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED ||
state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED;
}
@ -733,13 +742,27 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
private void writeAllowed() throws IllegalIndexShardStateException {
verifyStartedOrRecovering();
private void writeAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (origin == Engine.Operation.Origin.PRIMARY) {
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
// otherwise, we need to retry, we also want to still allow to index if we are relocated in case it fails
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
}
} else {
// for replicas, we allow to write also while recovering, since we index also during recovery to replicas
// and rely on version checks to make sure its consistent
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
}
}
}
private void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING) {
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.basic;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -44,28 +43,24 @@ public class SearchWhileCreatingIndexTests extends AbstractIntegrationTest {
@Test
@Slow
@AwaitsFix(bugUrl = "fix is coming")
public void testIndexCausesIndexCreation() throws Exception {
searchWhileCreatingIndex(-1, 1); // 1 replica in our default...
}
@Test
@Slow
@AwaitsFix(bugUrl = "fix is coming")
public void testNoReplicas() throws Exception {
searchWhileCreatingIndex(10, 0);
}
@Test
@Slow
@AwaitsFix(bugUrl = "fix is coming")
public void testOneReplica() throws Exception {
searchWhileCreatingIndex(10, 1);
}
@Test
@Slow
@AwaitsFix(bugUrl = "fix is coming")
public void testTwoReplicas() throws Exception {
searchWhileCreatingIndex(10, 2);
}