diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dbdb13f03d7..1f9fe8ac0fe 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -682,15 +682,11 @@ public class InternalEngine extends Engine { @Override public void flush() throws EngineException { - flush(true, false, false); + flush(false, false); } @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - flush(true, force, waitIfOngoing); - } - - private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); /* * Unfortunately the lock order is important here. We have to acquire the readlock first otherwise @@ -714,37 +710,21 @@ public class InternalEngine extends Engine { logger.trace("acquired flush lock immediately"); } try { - if (commitTranslog) { - if (flushNeeded || force) { - flushNeeded = false; - try { - translog.prepareCommit(); - logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translog); - logger.trace("finished commit for flush"); - translog.commit(); - // we need to refresh in order to clear older version values - refresh("version_table_flush"); - - } catch (Throwable e) { - throw new FlushFailedEngineException(shardId, e); - } - } - } else { - // note, its ok to just commit without cleaning the translog, its perfectly fine to replay a - // translog on an index that was opened on a committed point in time that is "in the future" - // of that translog - // we allow to *just* commit if there is an ongoing recovery happening... - // its ok to use this, only a flush will cause a new translogFileGeneration, and we are locked here from - // other flushes use flushLock + if (flushNeeded || force) { + flushNeeded = false; + final long translogId; try { - logger.trace("starting commit for flush; commitTranslog=false"); + translog.prepareCommit(); + logger.trace("starting commit for flush; commitTranslog=true"); commitIndexWriter(indexWriter, translog); logger.trace("finished commit for flush"); + translog.commit(); + // we need to refresh in order to clear older version values + refresh("version_table_flush"); + } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); } - } /* * we have to inc-ref the store here since if the engine is closed by a tragic event @@ -838,7 +818,7 @@ public class InternalEngine extends Engine { indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/); } if (flush) { - flush(true, true, true); + flush(true, true); } if (upgrade) { logger.info("finished segment upgrade"); @@ -865,7 +845,7 @@ public class InternalEngine extends Engine { // the to a write lock when we fail the engine in this operation if (flushFirst) { logger.trace("start flush for snapshot"); - flush(false, false, true); + flush(false, true); logger.trace("finish flush for snapshot"); } try (ReleasableLock lock = readLock.acquire()) { diff --git a/src/main/java/org/elasticsearch/index/store/IndexStoreModule.java b/src/main/java/org/elasticsearch/index/store/IndexStoreModule.java index c40301d560b..3d351179d21 100644 --- a/src/main/java/org/elasticsearch/index/store/IndexStoreModule.java +++ b/src/main/java/org/elasticsearch/index/store/IndexStoreModule.java @@ -32,7 +32,7 @@ public class IndexStoreModule extends AbstractModule implements SpawnModules { private final Settings settings; - public static enum Type { + public enum Type { NIOFS { @Override public boolean match(String setting) { diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java index 5890233fe27..738f4c106b1 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.engine.MockEngineSupport; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.store.MockFSIndexStoreModule; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.junit.Test; diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 37adeca328f..4ef3a153204 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -1556,7 +1556,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0); List shards = snapshotStatus.getShards(); for (SnapshotIndexShardStatus status : shards) { - assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files + assertThat(status.getStats().getProcessedFiles(), equalTo(0)); } } diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 5d8209b753e..cf0a52df92f 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -365,7 +365,7 @@ public final class InternalTestCluster extends TestCluster { .put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms") .put(SETTING_CLUSTER_NODE_SEED, seed); if (ENABLE_MOCK_MODULES && usually(random)) { - builder.put(IndexStoreModule.STORE_TYPE, MockFSIndexStoreModule.class.getName()); // no RAM dir for now! + builder.put(IndexStoreModule.STORE_TYPE, MockFSIndexStoreModule.class.getName()); builder.put(IndexShardModule.ENGINE_FACTORY, MockEngineFactory.class); builder.put(PageCacheRecyclerModule.CACHE_IMPL, MockPageCacheRecyclerModule.class.getName()); builder.put(BigArraysModule.IMPL, MockBigArraysModule.class.getName()); diff --git a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java index 2a8d7cff621..00e493da1c3 100644 --- a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -107,16 +107,7 @@ public class MockFSDirectoryService extends FsDirectoryService { public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, @IndexSettings Settings indexSettings) { if (indexShard != null && shardId.equals(sid)) { - logger.info("{} shard state before potentially flushing is {}", indexShard.shardId(), indexShard.state()); if (validCheckIndexStates.contains(indexShard.state()) && IndexMetaData.isOnSharedFilesystem(indexSettings) == false) { - // When the the internal engine closes we do a rollback, which removes uncommitted segments - // By doing a commit flush we perform a Lucene commit, but don't clear the translog, - // so that even in tests where don't flush we can check the integrity of the Lucene index - if (indexShard.engine().hasUncommittedChanges()) { // only if we have any changes - logger.info("{} flushing in order to run checkindex", indexShard.shardId()); - Releasables.close(indexShard.engine().snapshotIndex(true)); // Keep translog for tests that rely on replaying it - } - logger.info("{} flush finished in beforeIndexShardClosed", indexShard.shardId()); canRun = true; } }