From 4decc72da6abc426b97789ad149b31eef9aad583 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 6 Feb 2016 13:40:38 +0100 Subject: [PATCH] Fix recovery translog stats totals when recovering from store Recovery from store fails to correctly set the translog recovery stats. This fixes it and tightens up the logic bringing it all to IndexShard (previously it was set by the recovery logic). Closes #15974 Closes #16493 --- .../cluster/routing/ShardRouting.java | 2 +- .../index/engine/InternalEngine.java | 17 +------- .../elasticsearch/index/shard/IndexShard.java | 21 ++++++++-- .../index/shard/StoreRecovery.java | 5 --- .../shard/TranslogRecoveryPerformer.java | 21 ++++++++++ .../cluster/routing/ShardRoutingHelper.java | 5 +++ .../index/shard/IndexShardTests.java | 41 +++++++++++++++++-- 7 files changed, 84 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 7aaf6969948..7535aa1226e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -451,7 +451,7 @@ public final class ShardRouting implements Streamable, ToXContent { } /** - * Moves the shard from started to initializing and bumps the version + * Moves the shard from started to initializing */ void reinitializeShard() { ensureNotFrozen(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d9ee2f4177a..ff738c0140b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -43,7 +43,6 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.common.Nullable; @@ -68,7 +67,6 @@ import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -233,20 +231,7 @@ public class InternalEngine extends Engine { final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer(); try { Translog.Snapshot snapshot = translog.newSnapshot(); - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - try { - handler.performRecoveryOperation(this, operation, true); - opsRecovered++; - } catch (ElasticsearchException e) { - if (e.status() == RestStatus.BAD_REQUEST) { - // mainly for MapperParsingException and Failure to detect xcontent - logger.info("ignoring recovery of a corrupt translog entry", e); - } else { - throw e; - } - } - } + opsRecovered = handler.recoveryFromSnapshot(this, snapshot); } catch (Throwable e) { throw new EngineException(shardId, "failed to recover from translog", e); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 705d4d5aa5a..bbd00951277 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -55,6 +55,7 @@ import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.NodeServicesProvider; +import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; @@ -89,13 +90,12 @@ import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; -import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.index.store.Store.MetadataSnapshot; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.Store.MetadataSnapshot; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.suggest.stats.ShardSuggestMetric; @@ -105,8 +105,8 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; -import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.percolator.PercolatorService; @@ -874,6 +874,12 @@ public class IndexShard extends AbstractIndexShardComponent { * After the store has been recovered, we need to start the engine in order to apply operations */ public void performTranslogRecovery(boolean indexExists) { + if (indexExists == false) { + // note: these are set when recovering from the translog + final RecoveryState.Translog translogStats = recoveryState().getTranslog(); + translogStats.totalOperations(0); + translogStats.totalOperationsOnStart(0); + } internalPerformTranslogRecovery(false, indexExists); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } @@ -1387,6 +1393,15 @@ public class IndexShard extends AbstractIndexShardComponent { assert recoveryState != null; recoveryState.getTranslog().incrementRecoveredOperations(); } + + @Override + public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { + assert recoveryState != null; + RecoveryState.Translog translogStats = recoveryState.getTranslog(); + translogStats.totalOperations(snapshot.totalOperations()); + translogStats.totalOperationsOnStart(snapshot.totalOperations()); + return super.recoveryFromSnapshot(engine, snapshot); + } }; return new EngineConfig(shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 5f5aa95a994..aaa30c147c0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -203,7 +203,6 @@ final class StoreRecovery { logger.trace("cleaning existing shard, shouldn't exists"); IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); writer.close(); - recoveryState.getTranslog().totalOperations(0); } } } catch (Throwable e) { @@ -224,10 +223,6 @@ final class StoreRecovery { } catch (IOException e) { logger.debug("failed to list file details", e); } - if (indexShouldExists == false) { - recoveryState.getTranslog().totalOperations(0); - recoveryState.getTranslog().totalOperationsOnStart(0); - } indexShard.performTranslogRecovery(indexShouldExists); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index 4811ff1a275..f3f8f3c14cf 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.HashMap; @@ -77,6 +78,25 @@ public class TranslogRecoveryPerformer { return numOps; } + public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { + Translog.Operation operation; + int opsRecovered = 0; + while ((operation = snapshot.next()) != null) { + try { + performRecoveryOperation(engine, operation, true); + opsRecovered++; + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.BAD_REQUEST) { + // mainly for MapperParsingException and Failure to detect xcontent + logger.info("ignoring recovery of a corrupt translog entry", e); + } else { + throw e; + } + } + } + return opsRecovered; + } + public static class BatchOperationException extends ElasticsearchException { private final int completedOperations; @@ -182,6 +202,7 @@ public class TranslogRecoveryPerformer { // noop } + /** * Returns the recovered types modifying the mapping during the recovery */ diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java index fe7938f23b9..5d3466b5e43 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java @@ -48,6 +48,11 @@ public class ShardRoutingHelper { routing.reinitializeShard(); } + public static void reinit(ShardRouting routing, UnassignedInfo.Reason reason) { + routing.reinitializeShard(); + routing.updateUnassignedInfo(new UnassignedInfo(reason, "test_reinit")); + } + public static void moveToUnassigned(ShardRouting routing, UnassignedInfo info) { routing.moveToUnassigned(info); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ec2f8f32a20..778831245d1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -70,7 +70,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.NodeServicesProvider; @@ -865,10 +864,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService("test"); final IndexShard shard = test.getShardOrNull(0); - + int translogOps = 1; client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get(); if (randomBoolean()) { client().admin().indices().prepareFlush().get(); + translogOps = 0; } ShardRouting routing = new ShardRouting(shard.routingEntry()); test.removeShard(0, "b/c simon says so"); @@ -878,6 +878,10 @@ public class IndexShardTests extends ESSingleNodeTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertTrue(newShard.recoverFromStore(localNode)); + assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); routing = new ShardRouting(routing); ShardRoutingHelper.moveToStarted(routing); newShard.updateRoutingEntry(routing, true); @@ -885,6 +889,36 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertHitCount(response, 1); } + public void testRecoverFromCleanStore() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get(); + if (randomBoolean()) { + client().admin().indices().prepareFlush().get(); + } + ShardRouting routing = new ShardRouting(shard.routingEntry()); + test.removeShard(0, "b/c simon says so"); + ShardRoutingHelper.reinit(routing, UnassignedInfo.Reason.INDEX_CREATED); + IndexShard newShard = test.createShard(routing); + newShard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, + localNode)); + assertTrue(newShard.recoverFromStore(localNode)); + assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + routing = new ShardRouting(routing); + ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + SearchResponse response = client().prepareSearch().get(); + assertHitCount(response, 0); + } + public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException { createIndex("test"); ensureGreen(); @@ -1187,7 +1221,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { List operations = new ArrayList<>(); operations.add(new Translog.Index("testtype", "1", jsonBuilder().startObject().field("foo", "bar").endObject().bytes().toBytes())); newShard.prepareForIndexRecovery(); - newShard.performTranslogRecovery(true); + newShard.recoveryState().getTranslog().totalOperations(operations.size()); + newShard.skipTranslogRecovery(); newShard.performBatchRecovery(operations); assertFalse(newShard.getTranslog().syncNeeded()); }