From ddd16deb1d13987dab7dbffbc66954e9e4085dbc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 17 Feb 2015 09:57:55 +0100 Subject: [PATCH] [ENGINE] Flush IndexWriter to disk on close and shutdown Today we trash everything that has been indexed but not flushed to disk if the engine is closed. This might not be desired if we shutting down a node for restart / upgrade or if we close / archive an index. In such a case we would like to flush the transaction log and commit everything to disk. This commit adds a flag to the close method that is set on close and shutdown but not when we remove the shard due to relocations --- .../org/elasticsearch/index/IndexService.java | 3 +- .../elasticsearch/index/engine/Engine.java | 21 ++++++ .../index/engine/EngineConfig.java | 1 + .../index/engine/InternalEngine.java | 2 +- .../settings/IndexDynamicSettingsModule.java | 5 +- .../elasticsearch/index/shard/IndexShard.java | 28 +++++++- .../index/shard/IndexShardTests.java | 66 +++++++++++++++++++ .../index/store/CorruptedTranslogTests.java | 2 + .../indices/state/OpenCloseIndexTests.java | 51 ++++++++++++++ .../percolator/RecoveryPercolatorTests.java | 9 ++- .../test/engine/MockInternalEngine.java | 18 ++++- 11 files changed, 194 insertions(+), 12 deletions(-) create mode 100644 src/test/java/org/elasticsearch/index/shard/IndexShardTests.java diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index be85ace475e..3208f3b2506 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -388,7 +388,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone // and close the shard so no operations are allowed to it if (indexShard != null) { try { - indexShard.close(reason); + final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted + indexShard.close(reason, flushEngine); } catch (Throwable e) { logger.debug("[{}] failed to close index shard", e, shardId); // ignore diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 0a758b4af67..69bba98bc42 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1034,6 +1034,27 @@ public abstract class Engine implements Closeable { protected abstract void closeNoLock(String reason) throws ElasticsearchException; + public void flushAndClose() throws IOException { + if (isClosed.get() == false) { + logger.trace("flushAndClose now acquire writeLock"); + try (ReleasableLock _ = writeLock.acquire()) { + logger.trace("flushAndClose now acquired writeLock"); + try { + logger.debug("flushing shard on close - this might take some time to sync files to disk"); + try { + flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running. + } catch (FlushNotAllowedEngineException ex) { + logger.debug("flush not allowed during flushAndClose - skipping"); + } catch (EngineClosedException ex) { + logger.debug("engine already closed - skipping flushAndClose"); + } + } finally { + close(); // double close is not a problem + } + } + } + } + @Override public void close() throws IOException { if (isClosed.get() == false) { // don't acquire the write lock if we are already closed diff --git a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index e49651d0464..79ab1ead7a6 100644 --- a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -74,6 +74,7 @@ public final class EngineConfig { private final CodecService codecService; private final Engine.FailedEngineListener failedEngineListener; + /** * Index setting for index concurrency / number of threadstates in the indexwriter. * The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES} diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ed7879f064a..4819e56a613 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -60,7 +60,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @@ -868,6 +867,7 @@ public class InternalEngine extends Engine { } } + /** * Closes the engine without acquiring the write lock. This should only be * called while the write lock is hold or in a disaster condition ie. if the engine diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index bd696b076f8..26d7e00da2c 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -85,8 +85,9 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); - indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING); - indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING); + indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, Validator.BOOLEAN); + indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, Validator.BOOLEAN); + indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME); indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME); indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME); diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 825d472c894..df74f73e64e 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -174,6 +174,13 @@ public class IndexShard extends AbstractIndexShardComponent { private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener(); private final MapperAnalyzer mapperAnalyzer; + private volatile boolean flushOnClose = true; + + /** + * Index setting to control if a flush is executed before engine is closed + * This setting is realtime updateable. + */ + public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; @Inject public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog, @@ -213,6 +220,7 @@ public class IndexShard extends AbstractIndexShardComponent { this.shardBitsetFilterCache = shardBitsetFilterCache; state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); + this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); indexSettingsService.addListener(applyRefreshSettings); this.mapperAnalyzer = new MapperAnalyzer(mapperService); @@ -657,7 +665,7 @@ public class IndexShard extends AbstractIndexShardComponent { return engine().acquireSearcher(source); } - public void close(String reason) throws IOException { + public void close(String reason, boolean flushEngine) throws IOException { synchronized (mutex) { try { indexSettingsService.removeListener(applyRefreshSettings); @@ -670,7 +678,13 @@ public class IndexShard extends AbstractIndexShardComponent { changeState(IndexShardState.CLOSED, reason); } finally { final Engine engine = this.currentEngineReference.getAndSet(null); - IOUtils.close(engine); + try { + if (flushEngine && this.flushOnClose) { + engine.flushAndClose(); + } + } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times + IOUtils.close(engine); + } } } } @@ -935,6 +949,10 @@ public class IndexShard extends AbstractIndexShardComponent { updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); } + public final boolean isFlushOnClose() { + return flushOnClose; + } + private class ApplyRefreshSettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { @@ -943,6 +961,12 @@ public class IndexShard extends AbstractIndexShardComponent { if (state == IndexShardState.CLOSED) { return; } + final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose); + if (flushOnClose != IndexShard.this.flushOnClose) { + logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose); + IndexShard.this.flushOnClose = flushOnClose; + } + TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, IndexShard.this.refreshInterval); if (!refreshInterval.equals(IndexShard.this.refreshInterval)) { logger.info("updating refresh_interval from [{}] to [{}]", IndexShard.this.refreshInterval, refreshInterval); diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java new file mode 100644 index 00000000000..93d2ad19344 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ElasticsearchSingleNodeTest; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +/** + * Simple unit-test IndexShard related operations. + */ +public class IndexShardTests extends ElasticsearchSingleNodeTest { + + public void testFlushOnDeleteSetting() throws Exception { + boolean initValue = randomBoolean(); + createIndex("test", settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, initValue).build()); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + IndexShard shard = test.shard(0); + assertEquals(initValue, shard.isFlushOnClose()); + final boolean newValue = !initValue; + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, newValue).build())); + assertEquals(newValue, shard.isFlushOnClose()); + + try { + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, "FOOBAR").build())); + fail("exception expected"); + } catch (ElasticsearchIllegalArgumentException ex) { + + } + assertEquals(newValue, shard.isFlushOnClose()); + + } +} diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java index b31f1d0e496..ca57afc6af4 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.engine.MockInternalEngine; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.junit.Test; @@ -77,6 +78,7 @@ public class CorruptedTranslogTests extends ElasticsearchIntegrationTest { .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .put("index.refresh_interval", "-1") + .put(MockInternalEngine.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog .put("index.gateway.local.sync", "1s") // fsync the translog every second )); ensureYellow(); diff --git a/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java b/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java index 8cfe8059857..1f469bed06c 100644 --- a/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java +++ b/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexTests.java @@ -25,13 +25,26 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.store.MockFSDirectoryService; import org.junit.Test; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -288,4 +301,42 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest { assertThat(indexMetaData.getState(), equalTo(expectedState)); } } + + @Test + public void testOpenCloseWithDocs() throws IOException, ExecutionException, InterruptedException { + String mapping = XContentFactory.jsonBuilder(). + startObject(). + startObject("type"). + startObject("properties"). + startObject("test") + .field("type", "string") + .field("index", "not_analyzed") + .endObject(). + endObject(). + endObject() + .endObject().string(); + + assertAcked(client().admin().indices().prepareCreate("test") + .addMapping("type", mapping)); + ensureGreen(); + int docs = between(10, 100); + IndexRequestBuilder[] builder = new IndexRequestBuilder[docs]; + for (int i = 0; i < docs ; i++) { + builder[i] = client().prepareIndex("test", "initial", "" + i).setSource("test", "init"); + } + indexRandom(true, builder); + if (randomBoolean()) { + client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).execute().get(); + } + client().admin().indices().prepareClose("test").execute().get(); + + // check the index still contains the records that we indexed + client().admin().indices().prepareOpen("test").execute().get(); + ensureGreen(); + SearchResponse searchResponse = client().prepareSearch().setTypes("initial").setQuery(QueryBuilders.matchQuery("test", "init")).get(); + assertNoFailures(searchResponse); + assertHitCount(searchResponse, docs); + } + + } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java b/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java index 2c7e4a1befc..b2faae0f0f0 100644 --- a/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java +++ b/src/test/java/org/elasticsearch/percolator/RecoveryPercolatorTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; import org.elasticsearch.action.percolate.MultiPercolateResponse; import org.elasticsearch.action.percolate.PercolateResponse; @@ -36,7 +37,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; import static org.elasticsearch.client.Requests.clusterHealthRequest; -import static org.elasticsearch.common.settings.ImmutableSettings.builder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.*; @@ -105,7 +104,7 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest { @Slow public void testRestartNodePercolator2() throws Exception { internalCluster().startNode(); - assertAcked(prepareCreate("test").addMapping("type1", "field1", "type=string")); + assertAcked(prepareCreate("test").addMapping("type1", "field1", "type=string").addMapping(PercolatorService.TYPE_NAME, "color", "type=string")); logger.info("--> register a query"); client().prepareIndex("test", PercolatorService.TYPE_NAME, "kuku") @@ -133,8 +132,8 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest { ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - assertThat(client().prepareCount().setTypes(PercolatorService.TYPE_NAME).setQuery(matchAllQuery()).get().getCount(), equalTo(1l)); + CountResponse countResponse = client().prepareCount().setTypes(PercolatorService.TYPE_NAME).setQuery(matchAllQuery()).get(); + assertHitCount(countResponse, 1l); DeleteIndexResponse actionGet = client().admin().indices().prepareDelete("test").get(); assertThat(actionGet.isAcknowledged(), equalTo(true)); diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java index e9e1037dff7..d22d7994cb3 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -43,18 +43,21 @@ import java.util.concurrent.ConcurrentMap; public class MockInternalEngine extends InternalEngine { public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio"; public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper"; + public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio"; public static class MockContext { public final Random random; public final boolean wrapReader; public final Class wrapper; public final Settings indexSettings; + private final double flushOnClose; public MockContext(Random random, boolean wrapReader, Class wrapper, Settings indexSettings) { this.random = random; this.wrapReader = wrapReader; this.wrapper = wrapper; this.indexSettings = indexSettings; + flushOnClose = indexSettings.getAsDouble(FLUSH_ON_CLOSE_RATIO, 0.5d); } } @@ -79,7 +82,11 @@ public class MockInternalEngine extends InternalEngine { @Override public void close() throws IOException { try { - super.close(); + if (mockContext.flushOnClose > mockContext.random.nextDouble()) { + super.flushAndClose(); + } else { + super.close(); + } } finally { if (logger.isTraceEnabled()) { // log debug if we have pending searchers @@ -91,6 +98,15 @@ public class MockInternalEngine extends InternalEngine { logger.debug("Ongoing recoveries after engine close: " + onGoingRecoveries.get()); } + @Override + public void flushAndClose() throws IOException { + if (mockContext.flushOnClose > mockContext.random.nextDouble()) { + super.flushAndClose(); + } else { + super.close(); + } + } + @Override protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {