[TEST] Add a corrupted replica test verifying its still allocated

Add a test that verifies that even though all replicas are corrupted on all available nodes, and listing of shard stores faield, it still get allocated and properly recovered from the primary shard
This commit is contained in:
Shay Banon 2015-05-19 13:18:43 +02:00
parent 84b24e130e
commit a1a16276da
1 changed files with 84 additions and 4 deletions

View File

@ -19,16 +19,13 @@
package org.elasticsearch.index.store;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.*;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -68,6 +65,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
@ -76,10 +74,14 @@ import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -505,12 +507,90 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertThat(corruptedFile, notNullValue());
}
/**
* This test verifies that if we corrupt a replica, we can still get to green, even though
* listing its store fails. Note, we need to make sure that replicas are allocated on all data
* nodes, so that replica won't be sneaky and allocated on a node that doesn't have a corrupted
* replica.
*/
@Test
public void testReplicaCorruption() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
final Map<String, List<Path>> filesToCorrupt = findFilesToCorruptForReplica();
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
List<Path> paths = filesToCorrupt.get(nodeName);
if (paths != null) {
for (Path path : paths) {
try (OutputStream os = Files.newOutputStream(path)) {
os.write(0);
}
logger.info("corrupting file {} on node {}", path, nodeName);
}
}
return null;
}
});
ensureGreen();
}
private int numShards(String... index) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(index, false);
return shardIterators.size();
}
private Map<String, List<Path>> findFilesToCorruptForReplica() throws IOException {
Map<String, List<Path>> filesToNodes = new HashMap<>();
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ShardRouting shardRouting : state.getRoutingTable().allShards("test")) {
if (shardRouting.primary() == true) {
continue;
}
assertTrue(shardRouting.assignedToNode());
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(shardRouting.currentNodeId()).setFs(true).get();
NodeStats nodeStats = nodeStatses.getNodes()[0];
List<Path> files = new ArrayList<>();
filesToNodes.put(nodeStats.getNode().getName(), files);
for (FsStats.Info info : nodeStats.getFs()) {
String path = info.getPath();
final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/index";
Path file = PathUtils.get(path).resolve(relativeDataLocationPath);
if (Files.exists(file)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(file)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
files.add(item);
}
}
}
}
}
}
return filesToNodes;
}
private ShardRouting corruptRandomPrimaryFile() throws IOException {
return corruptRandomPrimaryFile(true);