Merge branch 'master' into feature/synced_flush

Conflicts:
	src/main/java/org/elasticsearch/index/engine/InternalEngine.java
This commit is contained in:
Simon Willnauer 2015-05-19 14:55:45 +02:00
commit 242a452142
2 changed files with 86 additions and 13 deletions

View File

@ -94,9 +94,6 @@ public class InternalEngine extends Engine {
private final SearcherFactory searcherFactory;
private final SearcherManager searcherManager;
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
private volatile boolean flushNeeded = false;
private final Lock flushLock = new ReentrantLock();
private final ReentrantLock optimizeLock = new ReentrantLock();
@ -348,7 +345,6 @@ public class InternalEngine extends Engine {
innerCreate(create);
}
}
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine("create", t);
throw new CreateFailedEngineException(shardId, create, t);
@ -455,7 +451,6 @@ public class InternalEngine extends Engine {
created = innerIndex(index);
}
}
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine("index", t);
throw new IndexFailedEngineException(shardId, index, t);
@ -553,7 +548,6 @@ public class InternalEngine extends Engine {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
innerDelete(delete);
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine("delete", t);
throw new DeleteFailedEngineException(shardId, delete, t);
@ -648,7 +642,6 @@ public class InternalEngine extends Engine {
indexWriter.deleteDocuments(query);
translog.add(new Translog.DeleteByQuery(delete));
flushNeeded = true;
} catch (Throwable t) {
maybeFailEngine("delete_by_query", t);
throw new DeleteByQueryFailedEngineException(shardId, delete, t);
@ -748,8 +741,7 @@ public class InternalEngine extends Engine {
logger.trace("acquired flush lock immediately");
}
try {
if (flushNeeded || force) {
flushNeeded = false;
if (indexWriter.hasUncommittedChanges() || force) {
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");

View File

@ -19,16 +19,13 @@
package org.elasticsearch.index.store;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.*;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -68,6 +65,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
@ -76,10 +74,14 @@ import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -505,12 +507,91 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertThat(corruptedFile, notNullValue());
}
/**
* This test verifies that if we corrupt a replica, we can still get to green, even though
* listing its store fails. Note, we need to make sure that replicas are allocated on all data
* nodes, so that replica won't be sneaky and allocated on a node that doesn't have a corrupted
* replica.
*/
@Test
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11226")
public void testReplicaCorruption() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
final Map<String, List<Path>> filesToCorrupt = findFilesToCorruptForReplica();
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
List<Path> paths = filesToCorrupt.get(nodeName);
if (paths != null) {
for (Path path : paths) {
try (OutputStream os = Files.newOutputStream(path)) {
os.write(0);
}
logger.info("corrupting file {} on node {}", path, nodeName);
}
}
return null;
}
});
ensureGreen();
}
private int numShards(String... index) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(index, false);
return shardIterators.size();
}
private Map<String, List<Path>> findFilesToCorruptForReplica() throws IOException {
Map<String, List<Path>> filesToNodes = new HashMap<>();
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ShardRouting shardRouting : state.getRoutingTable().allShards("test")) {
if (shardRouting.primary() == true) {
continue;
}
assertTrue(shardRouting.assignedToNode());
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(shardRouting.currentNodeId()).setFs(true).get();
NodeStats nodeStats = nodeStatses.getNodes()[0];
List<Path> files = new ArrayList<>();
filesToNodes.put(nodeStats.getNode().getName(), files);
for (FsStats.Info info : nodeStats.getFs()) {
String path = info.getPath();
final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/index";
Path file = PathUtils.get(path).resolve(relativeDataLocationPath);
if (Files.exists(file)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(file)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
files.add(item);
}
}
}
}
}
}
return filesToNodes;
}
private ShardRouting corruptRandomPrimaryFile() throws IOException {
return corruptRandomPrimaryFile(true);