From 397f442c6dabf5cd2105ba01d6864fbbb22d11e2 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 24 Sep 2013 15:06:37 +0200 Subject: [PATCH] Introduce internal post recovery state Introduce a new internal, index shard level, post recovery state, where the shard moves to when its done with recovery. The shard will now move to started only once the cluster state with its respective cluster state level state is started. This change allow to have more fine grained control over when to allow reads on a shard, resolving potential refresh temporal visibility aspects while indexing and issuign a refresh. By only allowing reads on started shards, and making sure we refresh right before we move to started --- .../gateway/IndexShardGatewayService.java | 4 +- .../blobstore/BlobStoreIndexShardGateway.java | 2 +- .../gateway/local/LocalIndexShardGateway.java | 4 +- .../gateway/none/NoneIndexShardGateway.java | 2 +- .../index/shard/IndexShardState.java | 7 +- .../shard/service/InternalIndexShard.java | 97 ++++++++++++------- .../basic/SearchWhileCreatingIndexTests.java | 5 - 7 files changed, 70 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 81c47bd4e72..acc4de59dd9 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -179,8 +179,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem lastTotalTranslogOperations = recoveryStatus.translog().currentTranslogOperations(); // start the shard if the gateway has not started it already - if (indexShard.state() != IndexShardState.STARTED) { - indexShard.start("post recovery from gateway"); + if (indexShard.state() != IndexShardState.POST_RECOVERY) { + indexShard.postRecovery("post recovery from gateway"); } // refresh the shard indexShard.refresh(new Engine.Refresh("post_gateway").force(true)); diff --git a/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index b567477d7b8..f37fe73a85a 100644 --- a/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -429,7 +429,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // no translog files, bail recoveryStatus.start().startTime(System.currentTimeMillis()); recoveryStatus.updateStage(RecoveryStatus.Stage.START); - indexShard.start("post recovery from gateway, no translog"); + indexShard.postRecovery("post recovery from gateway, no translog"); recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime()); recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook()); return; diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index e26c9aca583..8e4fae08940 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -154,7 +154,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen recoveryStatus.updateStage(RecoveryStatus.Stage.START); if (translogId == -1) { // no translog files, bail - indexShard.start("post recovery from gateway, no translog"); + indexShard.postRecovery("post recovery from gateway, no translog"); // no index, just start the shard and bail recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime()); recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook()); @@ -189,7 +189,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) { // no translog to recovery from, start and bail // no translog files, bail - indexShard.start("post recovery from gateway, no translog"); + indexShard.postRecovery("post recovery from gateway, no translog"); // no index, just start the shard and bail recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime()); recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook()); diff --git a/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index 1a3e8e01c31..16192202057 100644 --- a/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -70,7 +70,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement } catch (IOException e) { logger.warn("failed to clean store before starting shard", e); } - indexShard.start("post recovery from gateway"); + indexShard.postRecovery("post recovery from gateway"); recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); recoveryStatus.translog().startTime(System.currentTimeMillis()); recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShardState.java b/src/main/java/org/elasticsearch/index/shard/IndexShardState.java index d219786ece3..89a6f306373 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShardState.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShardState.java @@ -27,9 +27,10 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException; public enum IndexShardState { CREATED((byte) 0), RECOVERING((byte) 1), - STARTED((byte) 2), - RELOCATED((byte) 3), - CLOSED((byte) 4); + POST_RECOVERY((byte) 2), + STARTED((byte) 3), + RELOCATED((byte) 4), + CLOSED((byte) 5); private final byte id; diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 4689d1129c5..eae9b9c1811 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -260,6 +260,14 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } catch (Throwable t) { logger.debug("failed to refresh due to move to cluster wide started", t); } + synchronized (mutex) { + if (state != IndexShardState.POST_RECOVERY) { + logger.debug("suspected wrong state when acting on cluster state started state, current state {}", state); + } + logger.debug("state: [{}]->[{}], reason [global state moved to started]", state, IndexShardState.STARTED); + state = IndexShardState.STARTED; + } + indicesLifecycle.afterIndexShardStarted(this); } this.shardRouting = newRouting; indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting); @@ -285,6 +293,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (state == IndexShardState.RECOVERING) { throw new IndexShardRecoveringException(shardId); } + if (state == IndexShardState.POST_RECOVERY) { + throw new IndexShardRecoveringException(shardId); + } logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason); state = IndexShardState.RECOVERING; return returnValue; @@ -302,29 +313,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return this; } - public InternalIndexShard start(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new IndexShardClosedException(shardId); - } - if (state == IndexShardState.STARTED) { - throw new IndexShardStartedException(shardId); - } - if (state == IndexShardState.RELOCATED) { - throw new IndexShardRelocatedException(shardId); - } - if (Booleans.parseBoolean(checkIndexOnStartup, false)) { - checkIndex(true); - } - engine.start(); - startScheduledTasksIfNeeded(); - logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.STARTED, reason); - state = IndexShardState.STARTED; - } - indicesLifecycle.afterIndexShardStarted(this); - return this; - } - @Override public IndexShardState state() { return state; @@ -340,7 +328,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException { - writeAllowed(); + writeAllowed(create.origin()); create = indexingService.preCreate(create); if (logger.isTraceEnabled()) { logger.trace("index {}", create.docs()); @@ -361,7 +349,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException { - writeAllowed(); + writeAllowed(index.origin()); index = indexingService.preIndex(index); try { if (logger.isTraceEnabled()) { @@ -386,7 +374,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public void delete(Engine.Delete delete) throws ElasticSearchException { - writeAllowed(); + writeAllowed(delete.origin()); delete = indexingService.preDelete(delete); try { if (logger.isTraceEnabled()) { @@ -417,7 +405,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException { - writeAllowed(); + writeAllowed(deleteByQuery.origin()); if (logger.isTraceEnabled()) { logger.trace("delete_by_query [{}]", deleteByQuery.query()); } @@ -578,7 +566,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I public T snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.CLOSED) { + if (state != IndexShardState.POST_RECOVERY && state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.CLOSED) { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); } return engine.snapshot(snapshotHandler); @@ -620,6 +608,29 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return this.checkIndexTook; } + + public InternalIndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { + synchronized (mutex) { + if (state == IndexShardState.CLOSED) { + throw new IndexShardClosedException(shardId); + } + if (state == IndexShardState.STARTED) { + throw new IndexShardStartedException(shardId); + } + if (state == IndexShardState.RELOCATED) { + throw new IndexShardRelocatedException(shardId); + } + if (Booleans.parseBoolean(checkIndexOnStartup, false)) { + checkIndex(true); + } + engine.start(); + startScheduledTasksIfNeeded(); + logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.POST_RECOVERY, reason); + state = IndexShardState.POST_RECOVERY; + } + return this; + } + /** * After the store has been recovered, we need to start the engine in order to apply operations */ @@ -657,11 +668,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I translog.clearUnreferenced(); engine.refresh(new Engine.Refresh("recovery_finalization").force(true)); synchronized (mutex) { - logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED); - state = IndexShardState.STARTED; + logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.POST_RECOVERY); + state = IndexShardState.POST_RECOVERY; } startScheduledTasksIfNeeded(); - indicesLifecycle.afterIndexShardStarted(this); engine.enableGcDeletes(true); } @@ -691,8 +701,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I break; case DELETE_BY_QUERY: Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; - engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()) - .origin(Engine.Operation.Origin.RECOVERY)); + engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()).origin(Engine.Operation.Origin.RECOVERY)); break; default: throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]"); @@ -722,7 +731,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I */ public boolean ignoreRecoveryAttempt() { IndexShardState state = state(); // one time volatile read - return state == IndexShardState.RECOVERING || state == IndexShardState.STARTED || + return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED; } @@ -733,13 +742,27 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } - private void writeAllowed() throws IllegalIndexShardStateException { - verifyStartedOrRecovering(); + private void writeAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException { + IndexShardState state = this.state; // one time volatile read + + if (origin == Engine.Operation.Origin.PRIMARY) { + // for primaries, we only allow to write when actually started (so the cluster has decided we started) + // otherwise, we need to retry, we also want to still allow to index if we are relocated in case it fails + if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering"); + } + } else { + // for replicas, we allow to write also while recovering, since we index also during recovery to replicas + // and rely on version checks to make sure its consistent + if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering"); + } + } } private void verifyStartedOrRecovering() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read - if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING) { + if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering"); } } diff --git a/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexTests.java b/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexTests.java index 2932867fc7a..beeafa5e4e5 100644 --- a/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexTests.java +++ b/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.basic; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -44,28 +43,24 @@ public class SearchWhileCreatingIndexTests extends AbstractIntegrationTest { @Test @Slow - @AwaitsFix(bugUrl = "fix is coming") public void testIndexCausesIndexCreation() throws Exception { searchWhileCreatingIndex(-1, 1); // 1 replica in our default... } @Test @Slow - @AwaitsFix(bugUrl = "fix is coming") public void testNoReplicas() throws Exception { searchWhileCreatingIndex(10, 0); } @Test @Slow - @AwaitsFix(bugUrl = "fix is coming") public void testOneReplica() throws Exception { searchWhileCreatingIndex(10, 1); } @Test @Slow - @AwaitsFix(bugUrl = "fix is coming") public void testTwoReplicas() throws Exception { searchWhileCreatingIndex(10, 2); }