From 7739aad1aa164827e0729610e248dd30db9a7915 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Jul 2017 10:18:46 +0200 Subject: [PATCH] Add testing around recovery to TruncateTranslogIT --- .../index/translog/TruncateTranslogIT.java | 200 ++++++++++++++++-- 1 file changed, 178 insertions(+), 22 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index 4ef26b9357c..75180339805 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -29,9 +29,10 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -43,15 +44,18 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.engine.MockEngineSupport; -import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; @@ -63,7 +67,6 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -72,7 +75,10 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) @@ -84,23 +90,40 @@ public class TruncateTranslogIT extends ESIntegTestCase { } public void testCorruptTranslogTruncation() throws Exception { - internalCluster().startNodes(1, Settings.EMPTY); + internalCluster().startNodes(2, Settings.EMPTY); - assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", "-1") - .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog - )); + final String replicaNode = internalCluster().getNodeNames()[1]; + + assertAcked(prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.refresh_interval", "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog + .put("index.routing.allocation.exclude._name", replicaNode) + )); ensureYellow(); + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", (String)null) + )); + // Index some documents - int numDocs = scaledRandomIntBetween(100, 1000); - IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + logger.info("--> indexing more doc to be kept"); + int numDocsToKeep = randomIntBetween(0, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep]; for (int i = 0; i < builders.length; i++) { builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); } + indexRandom(false, false, false, Arrays.asList(builders)); + flush("test"); disableTranslogFlush("test"); + // having no extra docs is an interesting case for seq no based recoveries - test it more often + int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100); + logger.info("--> indexing [{}] more doc to be truncated", numDocsToTruncate); + builders = new IndexRequestBuilder[numDocsToTruncate]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); + } indexRandom(false, false, false, Arrays.asList(builders)); Set translogDirs = getTranslogDirs("test"); @@ -120,17 +143,32 @@ public class TruncateTranslogIT extends ESIntegTestCase { } } + final boolean expectSeqNoRecovery; + if (randomBoolean() && numDocsToTruncate > 0) { + // flush the replica, so it will have more docs than what the primary will have + Index index = resolveIndex("test"); + IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(new ShardId(index, 0)); + replica.flush(new FlushRequest()); + expectSeqNoRecovery = false; + logger.info("--> ops based recovery disabled by flushing replica"); + } else { + expectSeqNoRecovery = true; + } + + // shut down the replica node to be tested later + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); + // Corrupt the translog file(s) logger.info("--> corrupting translog"); corruptRandomTranslogFiles("test"); // Restart the single node logger.info("--> restarting node"); - internalCluster().fullRestart(); + internalCluster().restartRandomDataNode(); client().admin().cluster().prepareHealth().setWaitForYellowStatus() - .setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)) - .setWaitForEvents(Priority.LANGUID) - .get(); + .setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)) + .setWaitForEvents(Priority.LANGUID) + .get(); try { client().prepareSearch("test").setQuery(matchAllQuery()).get(); @@ -149,7 +187,7 @@ public class TruncateTranslogIT extends ESIntegTestCase { assertBusy(() -> { logger.info("--> checking that lock has been released for {}", idxLocation); try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); - Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { // Great, do nothing, we just wanted to obtain the lock } catch (LockObtainFailedException lofe) { logger.info("--> failed acquiring lock for {}", idxLocation); @@ -171,14 +209,122 @@ public class TruncateTranslogIT extends ESIntegTestCase { ensureYellow("test"); // Run a search and make sure it succeeds - SearchResponse resp = client().prepareSearch("test").setQuery(matchAllQuery()).get(); - ElasticsearchAssertions.assertNoFailures(resp); + assertHitCount(client().prepareSearch("test").setQuery(matchAllQuery()).get(), numDocsToKeep); + + logger.info("--> starting the replica node to test recovery"); + internalCluster().startNode(); + ensureGreen("test"); + assertHitCount(client().prepareSearch("test").setPreference("_replica").setQuery(matchAllQuery()).get(), numDocsToKeep); + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); + final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() + .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); + assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), + expectSeqNoRecovery ? equalTo(0) : greaterThan(0)); + } + + public void testCorruptTranslogTruncationOfReplica() throws Exception { + internalCluster().startNodes(2, Settings.EMPTY); + + final String primaryNode = internalCluster().getNodeNames()[0]; + final String replicaNode = internalCluster().getNodeNames()[1]; + + assertAcked(prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.refresh_interval", "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog + .put("index.routing.allocation.exclude._name", replicaNode) + )); + ensureYellow(); + + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", (String)null) + )); + ensureGreen(); + + // Index some documents + logger.info("--> indexing more doc to be kept"); + int numDocsToKeep = randomIntBetween(0, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); + } + indexRandom(false, false, false, Arrays.asList(builders)); + flush("test"); + disableTranslogFlush("test"); + // having no extra docs is an interesting case for seq no based recoveries - test it more often + int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100); + logger.info("--> indexing [{}] more doc to be truncated", numDocsToTruncate); + builders = new IndexRequestBuilder[numDocsToTruncate]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); + } + indexRandom(false, false, false, Arrays.asList(builders)); + final int totalDocs = numDocsToKeep + numDocsToTruncate; + + + // sample the replica node translog dirs + final ShardId shardId = new ShardId(resolveIndex("test"), 0); + Set translogDirs = getTranslogDirs(replicaNode, shardId); + + // stop the cluster nodes. we don't use full restart so the node start up order will be the same + // and shard roles will be maintained + internalCluster().stopRandomDataNode(); + internalCluster().stopRandomDataNode(); + + // Corrupt the translog file(s) + logger.info("--> corrupting translog"); + corruptTranslogFiles(translogDirs); + + // Restart the single node + logger.info("--> starting node"); + internalCluster().startNode(); + + ensureYellow(); + + // Run a search and make sure it succeeds + assertHitCount(client().prepareSearch("test").setQuery(matchAllQuery()).get(), totalDocs); + + TruncateTranslogCommand ttc = new TruncateTranslogCommand(); + MockTerminal t = new MockTerminal(); + OptionParser parser = ttc.getParser(); + + for (Path translogDir : translogDirs) { + final Path idxLocation = translogDir.getParent().resolve("index"); + assertBusy(() -> { + logger.info("--> checking that lock has been released for {}", idxLocation); + try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); + Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + // Great, do nothing, we just wanted to obtain the lock + } catch (LockObtainFailedException lofe) { + logger.info("--> failed acquiring lock for {}", idxLocation); + fail("still waiting for lock release at [" + idxLocation + "]"); + } catch (IOException ioe) { + fail("Got an IOException: " + ioe); + } + }); + + OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b"); + logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath()); + ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */); + logger.info("--> output:\n{}", t.getOutput()); + } + + logger.info("--> starting the replica node to test recovery"); + internalCluster().startNode(); + ensureGreen("test"); + assertHitCount(client().prepareSearch("test").setPreference("_replica").setQuery(matchAllQuery()).get(), totalDocs); + + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); + final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() + .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); + // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery + assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); } private Set getTranslogDirs(String indexName) throws IOException { ClusterState state = client().admin().cluster().prepareState().get().getState(); GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false); - final Index idx = state.metaData().index(indexName).getIndex(); List iterators = iterableAsArrayList(shardIterators); ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators); ShardRouting shardRouting = shardIterator.nextOrNull(); @@ -186,11 +332,17 @@ public class TruncateTranslogIT extends ESIntegTestCase { assertTrue(shardRouting.primary()); assertTrue(shardRouting.assignedToNode()); String nodeId = shardRouting.currentNodeId(); + ShardId shardId = shardRouting.shardId(); + return getTranslogDirs(nodeId, shardId); + } + + private Set getTranslogDirs(String nodeId, ShardId shardId) { NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); Set translogDirs = new TreeSet<>(); // treeset makes sure iteration order is deterministic for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) { String path = fsPath.getPath(); - final String relativeDataLocationPath = "indices/"+ idx.getUUID() +"/" + Integer.toString(shardRouting.getId()) + "/translog"; + final String relativeDataLocationPath = "indices/"+ shardId.getIndex().getUUID() +"/" + Integer.toString(shardId.getId()) + + "/translog"; Path translogPath = PathUtils.get(path).resolve(relativeDataLocationPath); if (Files.isDirectory(translogPath)) { translogDirs.add(translogPath); @@ -201,6 +353,10 @@ public class TruncateTranslogIT extends ESIntegTestCase { private void corruptRandomTranslogFiles(String indexName) throws IOException { Set translogDirs = getTranslogDirs(indexName); + corruptTranslogFiles(translogDirs); + } + + private void corruptTranslogFiles(Set translogDirs) throws IOException { Set files = new TreeSet<>(); // treeset makes sure iteration order is deterministic for (Path translogDir : translogDirs) { if (Files.isDirectory(translogDir)) {