Remove TranslogService and fold it into synchronous IndexShard API

This commit moves the size and ops based flush into a synchronous API into
IndexShard and removes the time-based flush alltogether since it' basically
covered by the inactive async flush API we have today. The functionality doesn't
need to be covered by scheduled task and async APIs while we can actually make all
the decisions in a sync manner which is way easier to control and to test.

Closes #13707
This commit is contained in:
Simon Willnauer 2015-09-22 11:04:46 +02:00
parent 4fb6386df3
commit 75e816400c
15 changed files with 204 additions and 297 deletions

View File

@ -304,7 +304,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
assert preVersionTypes[requestIndex] != null;
}
processAfter(request, indexShard, location);
processAfter(request.refresh(), indexShard, location);
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
@ -500,21 +500,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
}
processAfter(request, indexShard, location);
}
private void processAfter(BulkShardRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_bulk");
} catch (Throwable e) {
// ignore
}
}
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
processAfter(request.refresh(), indexShard, location);
}
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {

View File

@ -138,8 +138,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
request.version(delete.version());
assert request.versionType().validateVersionForWrites(request.version());
processAfter(request, indexShard, delete.getTranslogLocation());
processAfter(request.refresh(), indexShard, delete.getTranslogLocation());
DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found());
return new Tuple<>(response, shardRequest.request);
@ -151,7 +150,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
processAfter(request, indexShard, delete.getTranslogLocation());
processAfter(request.refresh(), indexShard, delete.getTranslogLocation());
}
@Override
@ -159,18 +158,4 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
}
private void processAfter(DeleteRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_delete");
} catch (Throwable e) {
// ignore
}
}
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
}
}

View File

@ -170,7 +170,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard);
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfter(request, indexShard, location);
processAfter(request.refresh(), indexShard, location);
return new Tuple<>(response, shardRequest.request);
}
@ -193,20 +193,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
processAfter(request, indexShard, operation.getTranslogLocation());
processAfter(request.refresh(), indexShard, operation.getTranslogLocation());
}
private void processAfter(IndexRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Throwable e) {
// ignore
}
}
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
}
}

View File

@ -1082,4 +1082,18 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return new WriteResult(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
}
protected final void processAfter(boolean refresh, IndexShard indexShard, Translog.Location location) {
if (refresh) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Throwable e) {
// ignore
}
}
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
indexShard.maybeFlush();
}
}

View File

@ -81,7 +81,6 @@ import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
@ -259,11 +258,9 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, Validator.TIME);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME);
registerIndexDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY);
registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);

View File

@ -59,7 +59,6 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
@ -435,9 +434,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable);
}
}
// now we can close the translog service, we need to close it before the we close the shard
// note the that the translog service is not there for shadow replicas
closeInjectorOptionalResource(sId, shardInjector, TranslogService.class);
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {

View File

@ -55,4 +55,6 @@ public abstract class AbstractIndexShardComponent implements IndexShardComponent
public String nodeName() {
return indexSettings.get("name", "");
}
}

View File

@ -46,10 +46,12 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.IndexService;
@ -113,7 +115,10 @@ import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class IndexShard extends AbstractIndexShardComponent {
@ -175,12 +180,20 @@ public class IndexShard extends AbstractIndexShardComponent {
private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
private volatile boolean disableFlush;
/**
* Index setting to control if a flush is executed before engine is closed
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
private final ShardPath path;
private final IndexShardOperationCounter indexShardOperationCounter;
@ -250,8 +263,10 @@ public class IndexShard extends AbstractIndexShardComponent {
cachingPolicy = new UsageTrackingQueryCachingPolicy();
}
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdOperations = indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
}
public Store store() {
@ -1050,6 +1065,25 @@ public class IndexShard extends AbstractIndexShardComponent {
storeRecoveryService.recover(this, shouldExist, recoveryListener);
}
/**
* Returns <code>true</code> iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
* Otherwise <code>false</code>.
*/
boolean shouldFlush() {
if (disableFlush == false) {
Engine engine = engineUnsafe();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.totalOperations() > flushThresholdOperations || translog.sizeInBytes() > flushThresholdSize.bytes();
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
}
}
}
return false;
}
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
@ -1058,6 +1092,22 @@ public class IndexShard extends AbstractIndexShardComponent {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations);
if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations);
IndexShard.this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize);
if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize);
IndexShard.this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush);
if (disableFlush != IndexShard.this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush);
IndexShard.this.disableFlush = disableFlush;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose);
if (flushOnClose != IndexShard.this.flushOnClose) {
@ -1437,4 +1487,38 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
*/
public boolean maybeFlush() {
if (shouldFlush()) {
if (asyncFlushRunning.compareAndSet(false, true)) {
final AbstractRunnable abstractRunnable = new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", t);
}
}
@Override
protected void doRun() throws Exception {
flush(new FlushRequest());
}
@Override
public void onAfter() {
asyncFlushRunning.compareAndSet(true, false);
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
return true;
}
}
return false;
}
}

View File

@ -20,18 +20,15 @@
package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.query.index.IndexQueryCache;
import org.elasticsearch.index.engine.IndexSearcherWrapper;
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.TranslogService;
/**
* The {@code IndexShardModule} module is responsible for binding the correct
@ -68,7 +65,6 @@ public class IndexShardModule extends AbstractModule {
bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton();
} else {
bind(IndexShard.class).asEagerSingleton();
bind(TranslogService.class).asEagerSingleton();
}
bind(EngineFactory.class).to(engineFactoryImpl);

View File

@ -108,6 +108,12 @@ public final class ShadowIndexShard extends IndexShard {
return engineFactory.newReadOnlyEngine(config);
}
@Override
public boolean shouldFlush() {
// we don't need to flush since we don't write - all dominated by the primary
return false;
}
public boolean allowsPrimaryPromotion() {
return false;
}

View File

@ -1,207 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
*
*/
public class TranslogService extends AbstractIndexShardComponent implements Closeable {
public static final String INDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period";
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
private final ThreadPool threadPool;
private final IndexSettingsService indexSettingsService;
private final IndexShard indexShard;
private volatile TimeValue interval;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue flushThresholdPeriod;
private volatile boolean disableFlush;
private volatile ScheduledFuture future;
private final ApplySettings applySettings = new ApplySettings();
@Inject
public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, ThreadPool threadPool, IndexShard indexShard) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.indexShard = indexShard;
this.flushThresholdOperations = indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.flushThresholdPeriod = indexSettings.getAsTime(INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TimeValue.timeValueMinutes(30));
this.interval = indexSettings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, timeValueMillis(5000));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());
indexSettingsService.addListener(applySettings);
}
@Override
public void close() {
indexSettingsService.removeListener(applySettings);
FutureUtils.cancel(this.future);
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, TranslogService.this.flushThresholdOperations);
if (flushThresholdOperations != TranslogService.this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", TranslogService.this.flushThresholdOperations, flushThresholdOperations);
TranslogService.this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, TranslogService.this.flushThresholdSize);
if (!flushThresholdSize.equals(TranslogService.this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", TranslogService.this.flushThresholdSize, flushThresholdSize);
TranslogService.this.flushThresholdSize = flushThresholdSize;
}
TimeValue flushThresholdPeriod = settings.getAsTime(INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TranslogService.this.flushThresholdPeriod);
if (!flushThresholdPeriod.equals(TranslogService.this.flushThresholdPeriod)) {
logger.info("updating flush_threshold_period from [{}] to [{}]", TranslogService.this.flushThresholdPeriod, flushThresholdPeriod);
TranslogService.this.flushThresholdPeriod = flushThresholdPeriod;
}
TimeValue interval = settings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, TranslogService.this.interval);
if (!interval.equals(TranslogService.this.interval)) {
logger.info("updating interval from [{}] to [{}]", TranslogService.this.interval, interval);
TranslogService.this.interval = interval;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, TranslogService.this.disableFlush);
if (disableFlush != TranslogService.this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", TranslogService.this.disableFlush, disableFlush);
TranslogService.this.disableFlush = disableFlush;
}
}
}
private TimeValue computeNextInterval() {
return new TimeValue(interval.millis() + (ThreadLocalRandom.current().nextLong(interval.millis())));
}
private class TranslogBasedFlush implements Runnable {
private volatile long lastFlushTime = System.currentTimeMillis();
@Override
public void run() {
if (indexShard.state() == IndexShardState.CLOSED) {
return;
}
// flush is disabled, but still reschedule
if (disableFlush) {
reschedule();
return;
}
Translog translog = indexShard.engine().getTranslog();
if (translog == null) {
reschedule();
return;
}
int currentNumberOfOperations = translog.totalOperations();
if (currentNumberOfOperations == 0) {
reschedule();
return;
}
if (flushThresholdOperations > 0) {
if (currentNumberOfOperations > flushThresholdOperations) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
asyncFlushAndReschedule();
return;
}
}
if (flushThresholdSize.bytes() > 0) {
long sizeInBytes = translog.sizeInBytes();
if (sizeInBytes > flushThresholdSize.bytes()) {
logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize);
asyncFlushAndReschedule();
return;
}
}
if (flushThresholdPeriod.millis() > 0) {
if ((threadPool.estimatedTimeInMillis() - lastFlushTime) > flushThresholdPeriod.millis()) {
logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod);
asyncFlushAndReschedule();
return;
}
}
reschedule();
}
private void reschedule() {
future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, this);
}
private void asyncFlushAndReschedule() {
threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
@Override
public void run() {
try {
indexShard.flush(new FlushRequest());
} catch (IllegalIndexShardStateException e) {
// we are being closed, or in created state, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Throwable e) {
logger.warn("failed to flush shard on translog threshold", e);
}
lastFlushTime = threadPool.estimatedTimeInMillis();
if (indexShard.state() != IndexShardState.CLOSED) {
future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, TranslogBasedFlush.this);
}
}
});
}
}
}

View File

@ -46,6 +46,8 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
@ -53,6 +55,7 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Mapping;
@ -77,6 +80,8 @@ import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -685,4 +690,77 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertTrue(postIndexWithExceptionCalled.get());
}
public void testMaybeFlush() throws Exception {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
IndexShard shard = test.shard(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.index(index);
assertTrue(shard.shouldFlush());
assertEquals(2, shard.engine().getTranslog().totalOperations());
client().prepareIndex("test", "test", "2").setSource("{}").setRefresh(randomBoolean()).get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldFlush());
});
assertEquals(0, shard.engine().getTranslog().totalOperations());
long size = shard.engine().getTranslog().sizeInBytes();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1000)
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES)).build()).get();
client().prepareDelete("test", "test", "2").get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldFlush());
});
assertEquals(0, shard.engine().getTranslog().totalOperations());
}
public void testStressMaybeFlush() throws Exception {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);
final int numThreads = randomIntBetween(2, 4);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
public void run() {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
while (running.get()) {
shard.maybeFlush();
}
}
};
threads[i].start();
}
barrier.await();
FlushStats flushStats = shard.flushStats();
long total = flushStats.getTotal();
client().prepareIndex("test", "test", "1").setSource("{}").get();
assertBusy(() -> {
assertEquals(total + 1, shard.flushStats().getTotal());
});
running.set(false);
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
}
}

View File

@ -62,7 +62,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
@ -157,7 +156,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -263,7 +262,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -491,7 +490,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -547,7 +546,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();

View File

@ -101,11 +101,11 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.TranslogWriter;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
@ -496,19 +496,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) {
if (random.nextBoolean()) {
builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, RandomInts.randomIntBetween(random, 1, 10000));
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, RandomInts.randomIntBetween(random, 1, 10000));
}
if (random.nextBoolean()) {
builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
}
if (random.nextBoolean()) {
builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TimeValue.timeValueMinutes(RandomInts.randomIntBetween(random, 1, 60)));
}
if (random.nextBoolean()) {
builder.put(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 1, 10000)));
}
if (random.nextBoolean()) {
builder.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean());
builder.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean());
}
if (random.nextBoolean()) {
builder.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
@ -1444,13 +1438,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
/** Disables translog flushing for the specified index */
public static void disableTranslogFlush(String index) {
Settings settings = Settings.builder().put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true).build();
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
public static void enableTranslogFlush(String index) {
Settings settings = Settings.builder().put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, false).build();
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, false).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}

View File

@ -35,16 +35,6 @@ Once the translog hits this size, a flush will happen. Defaults to `512mb`.
After how many operations to flush. Defaults to `unlimited`.
`index.translog.flush_threshold_period`::
How long to wait before triggering a flush regardless of translog size. Defaults to `30m`.
`index.translog.interval`::
How often to check if a flush is needed, randomized between the interval value
and 2x the interval value. Defaults to `5s`.
[float]
=== Translog settings