[ENGINE] Flush IndexWriter to disk on close and shutdown
Today we trash everything that has been indexed but not flushed to disk if the engine is closed. This might not be desired if we shutting down a node for restart / upgrade or if we close / archive an index. In such a case we would like to flush the transaction log and commit everything to disk. This commit adds a flag to the close method that is set on close and shutdown but not when we remove the shard due to relocations
This commit is contained in:
parent
85c611a1b7
commit
ddd16deb1d
|
@ -388,7 +388,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
// and close the shard so no operations are allowed to it
|
||||
if (indexShard != null) {
|
||||
try {
|
||||
indexShard.close(reason);
|
||||
final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted
|
||||
indexShard.close(reason, flushEngine);
|
||||
} catch (Throwable e) {
|
||||
logger.debug("[{}] failed to close index shard", e, shardId);
|
||||
// ignore
|
||||
|
|
|
@ -1034,6 +1034,27 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
protected abstract void closeNoLock(String reason) throws ElasticsearchException;
|
||||
|
||||
public void flushAndClose() throws IOException {
|
||||
if (isClosed.get() == false) {
|
||||
logger.trace("flushAndClose now acquire writeLock");
|
||||
try (ReleasableLock _ = writeLock.acquire()) {
|
||||
logger.trace("flushAndClose now acquired writeLock");
|
||||
try {
|
||||
logger.debug("flushing shard on close - this might take some time to sync files to disk");
|
||||
try {
|
||||
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
|
||||
} catch (FlushNotAllowedEngineException ex) {
|
||||
logger.debug("flush not allowed during flushAndClose - skipping");
|
||||
} catch (EngineClosedException ex) {
|
||||
logger.debug("engine already closed - skipping flushAndClose");
|
||||
}
|
||||
} finally {
|
||||
close(); // double close is not a problem
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
|
||||
|
|
|
@ -74,6 +74,7 @@ public final class EngineConfig {
|
|||
private final CodecService codecService;
|
||||
private final Engine.FailedEngineListener failedEngineListener;
|
||||
|
||||
|
||||
/**
|
||||
* Index setting for index concurrency / number of threadstates in the indexwriter.
|
||||
* The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES}
|
||||
|
|
|
@ -60,7 +60,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -868,6 +867,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Closes the engine without acquiring the write lock. This should only be
|
||||
* called while the write lock is hold or in a disaster condition ie. if the engine
|
||||
|
|
|
@ -85,8 +85,9 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
|||
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
|
||||
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
|
||||
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
|
||||
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING);
|
||||
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING);
|
||||
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, Validator.BOOLEAN);
|
||||
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, Validator.BOOLEAN);
|
||||
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
|
||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
|
||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
|
||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
|
||||
|
|
|
@ -174,6 +174,13 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
|
||||
|
||||
private final MapperAnalyzer mapperAnalyzer;
|
||||
private volatile boolean flushOnClose = true;
|
||||
|
||||
/**
|
||||
* Index setting to control if a flush is executed before engine is closed
|
||||
* This setting is realtime updateable.
|
||||
*/
|
||||
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
|
||||
|
||||
@Inject
|
||||
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
|
||||
|
@ -213,6 +220,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.shardBitsetFilterCache = shardBitsetFilterCache;
|
||||
state = IndexShardState.CREATED;
|
||||
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
|
||||
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
|
||||
indexSettingsService.addListener(applyRefreshSettings);
|
||||
|
||||
this.mapperAnalyzer = new MapperAnalyzer(mapperService);
|
||||
|
@ -657,7 +665,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return engine().acquireSearcher(source);
|
||||
}
|
||||
|
||||
public void close(String reason) throws IOException {
|
||||
public void close(String reason, boolean flushEngine) throws IOException {
|
||||
synchronized (mutex) {
|
||||
try {
|
||||
indexSettingsService.removeListener(applyRefreshSettings);
|
||||
|
@ -670,7 +678,13 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
changeState(IndexShardState.CLOSED, reason);
|
||||
} finally {
|
||||
final Engine engine = this.currentEngineReference.getAndSet(null);
|
||||
IOUtils.close(engine);
|
||||
try {
|
||||
if (flushEngine && this.flushOnClose) {
|
||||
engine.flushAndClose();
|
||||
}
|
||||
} finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
|
||||
IOUtils.close(engine);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -935,6 +949,10 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||
}
|
||||
|
||||
public final boolean isFlushOnClose() {
|
||||
return flushOnClose;
|
||||
}
|
||||
|
||||
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
|
@ -943,6 +961,12 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
if (state == IndexShardState.CLOSED) {
|
||||
return;
|
||||
}
|
||||
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose);
|
||||
if (flushOnClose != IndexShard.this.flushOnClose) {
|
||||
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose);
|
||||
IndexShard.this.flushOnClose = flushOnClose;
|
||||
}
|
||||
|
||||
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, IndexShard.this.refreshInterval);
|
||||
if (!refreshInterval.equals(IndexShard.this.refreshInterval)) {
|
||||
logger.info("updating refresh_interval from [{}] to [{}]", IndexShard.this.refreshInterval, refreshInterval);
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.GatewayMetaState;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
||||
/**
|
||||
* Simple unit-test IndexShard related operations.
|
||||
*/
|
||||
public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
||||
|
||||
public void testFlushOnDeleteSetting() throws Exception {
|
||||
boolean initValue = randomBoolean();
|
||||
createIndex("test", settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, initValue).build());
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard = test.shard(0);
|
||||
assertEquals(initValue, shard.isFlushOnClose());
|
||||
final boolean newValue = !initValue;
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, newValue).build()));
|
||||
assertEquals(newValue, shard.isFlushOnClose());
|
||||
|
||||
try {
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, "FOOBAR").build()));
|
||||
fail("exception expected");
|
||||
} catch (ElasticsearchIllegalArgumentException ex) {
|
||||
|
||||
}
|
||||
assertEquals(newValue, shard.isFlushOnClose());
|
||||
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.monitor.fs.FsStats;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.engine.MockInternalEngine;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportModule;
|
||||
import org.junit.Test;
|
||||
|
@ -77,6 +78,7 @@ public class CorruptedTranslogTests extends ElasticsearchIntegrationTest {
|
|||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.refresh_interval", "-1")
|
||||
.put(MockInternalEngine.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
|
||||
.put("index.gateway.local.sync", "1s") // fsync the translog every second
|
||||
));
|
||||
ensureYellow();
|
||||
|
|
|
@ -25,13 +25,26 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
|
@ -288,4 +301,42 @@ public class OpenCloseIndexTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(indexMetaData.getState(), equalTo(expectedState));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenCloseWithDocs() throws IOException, ExecutionException, InterruptedException {
|
||||
String mapping = XContentFactory.jsonBuilder().
|
||||
startObject().
|
||||
startObject("type").
|
||||
startObject("properties").
|
||||
startObject("test")
|
||||
.field("type", "string")
|
||||
.field("index", "not_analyzed")
|
||||
.endObject().
|
||||
endObject().
|
||||
endObject()
|
||||
.endObject().string();
|
||||
|
||||
assertAcked(client().admin().indices().prepareCreate("test")
|
||||
.addMapping("type", mapping));
|
||||
ensureGreen();
|
||||
int docs = between(10, 100);
|
||||
IndexRequestBuilder[] builder = new IndexRequestBuilder[docs];
|
||||
for (int i = 0; i < docs ; i++) {
|
||||
builder[i] = client().prepareIndex("test", "initial", "" + i).setSource("test", "init");
|
||||
}
|
||||
indexRandom(true, builder);
|
||||
if (randomBoolean()) {
|
||||
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).execute().get();
|
||||
}
|
||||
client().admin().indices().prepareClose("test").execute().get();
|
||||
|
||||
// check the index still contains the records that we indexed
|
||||
client().admin().indices().prepareOpen("test").execute().get();
|
||||
ensureGreen();
|
||||
SearchResponse searchResponse = client().prepareSearch().setTypes("initial").setQuery(QueryBuilders.matchQuery("test", "init")).get();
|
||||
assertNoFailures(searchResponse);
|
||||
assertHitCount(searchResponse, docs);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
import org.elasticsearch.action.count.CountResponse;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateResponse;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
|
@ -36,7 +37,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
|
||||
import static org.elasticsearch.client.Requests.clusterHealthRequest;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.builder;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.*;
|
||||
|
@ -105,7 +104,7 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest {
|
|||
@Slow
|
||||
public void testRestartNodePercolator2() throws Exception {
|
||||
internalCluster().startNode();
|
||||
assertAcked(prepareCreate("test").addMapping("type1", "field1", "type=string"));
|
||||
assertAcked(prepareCreate("test").addMapping("type1", "field1", "type=string").addMapping(PercolatorService.TYPE_NAME, "color", "type=string"));
|
||||
|
||||
logger.info("--> register a query");
|
||||
client().prepareIndex("test", PercolatorService.TYPE_NAME, "kuku")
|
||||
|
@ -133,8 +132,8 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest {
|
|||
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
assertThat(client().prepareCount().setTypes(PercolatorService.TYPE_NAME).setQuery(matchAllQuery()).get().getCount(), equalTo(1l));
|
||||
CountResponse countResponse = client().prepareCount().setTypes(PercolatorService.TYPE_NAME).setQuery(matchAllQuery()).get();
|
||||
assertHitCount(countResponse, 1l);
|
||||
|
||||
DeleteIndexResponse actionGet = client().admin().indices().prepareDelete("test").get();
|
||||
assertThat(actionGet.isAcknowledged(), equalTo(true));
|
||||
|
|
|
@ -43,18 +43,21 @@ import java.util.concurrent.ConcurrentMap;
|
|||
public class MockInternalEngine extends InternalEngine {
|
||||
public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
|
||||
public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
|
||||
public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio";
|
||||
|
||||
public static class MockContext {
|
||||
public final Random random;
|
||||
public final boolean wrapReader;
|
||||
public final Class<? extends FilterDirectoryReader> wrapper;
|
||||
public final Settings indexSettings;
|
||||
private final double flushOnClose;
|
||||
|
||||
public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
|
||||
this.random = random;
|
||||
this.wrapReader = wrapReader;
|
||||
this.wrapper = wrapper;
|
||||
this.indexSettings = indexSettings;
|
||||
flushOnClose = indexSettings.getAsDouble(FLUSH_ON_CLOSE_RATIO, 0.5d);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,7 +82,11 @@ public class MockInternalEngine extends InternalEngine {
|
|||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
super.close();
|
||||
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
||||
super.flushAndClose();
|
||||
} else {
|
||||
super.close();
|
||||
}
|
||||
} finally {
|
||||
if (logger.isTraceEnabled()) {
|
||||
// log debug if we have pending searchers
|
||||
|
@ -91,6 +98,15 @@ public class MockInternalEngine extends InternalEngine {
|
|||
logger.debug("Ongoing recoveries after engine close: " + onGoingRecoveries.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushAndClose() throws IOException {
|
||||
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
||||
super.flushAndClose();
|
||||
} else {
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||
|
||||
|
|
Loading…
Reference in New Issue