Merge pull request #15584 from s1monw/move_translog_syn_to_index_service
Move async translog sync logic into IndexService
This commit is contained in:
commit
8c898048bc
|
@ -1018,7 +1018,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
|
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null) {
|
||||||
indexShard.sync(location);
|
indexShard.sync(location);
|
||||||
}
|
}
|
||||||
indexShard.maybeFlush();
|
indexShard.maybeFlush();
|
||||||
|
|
|
@ -65,6 +65,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.ExtensionPoint;
|
import org.elasticsearch.common.util.ExtensionPoint;
|
||||||
import org.elasticsearch.gateway.GatewayAllocator;
|
import org.elasticsearch.gateway.GatewayAllocator;
|
||||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.indexing.IndexingSlowLog;
|
import org.elasticsearch.index.indexing.IndexingSlowLog;
|
||||||
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
||||||
|
@ -182,7 +183,7 @@ public class ClusterModule extends AbstractModule {
|
||||||
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
|
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
|
||||||
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
|
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
|
||||||
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
|
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
|
||||||
registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
|
registerIndexDynamicSetting(IndexSettings.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
|
||||||
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
|
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
|
||||||
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
|
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
|
||||||
registerIndexDynamicSetting(IndicesRequestCache.DEPRECATED_INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
|
registerIndexDynamicSetting(IndicesRequestCache.DEPRECATED_INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index;
|
||||||
import org.apache.lucene.search.BooleanClause;
|
import org.apache.lucene.search.BooleanClause;
|
||||||
import org.apache.lucene.search.BooleanQuery;
|
import org.apache.lucene.search.BooleanQuery;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.apache.lucene.util.Accountable;
|
import org.apache.lucene.util.Accountable;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||||
|
@ -39,6 +40,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
import org.elasticsearch.index.cache.IndexCache;
|
import org.elasticsearch.index.cache.IndexCache;
|
||||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||||
import org.elasticsearch.index.cache.query.QueryCache;
|
import org.elasticsearch.index.cache.query.QueryCache;
|
||||||
|
import org.elasticsearch.index.engine.EngineClosedException;
|
||||||
import org.elasticsearch.index.engine.EngineFactory;
|
import org.elasticsearch.index.engine.EngineFactory;
|
||||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||||
|
@ -57,9 +59,11 @@ import org.elasticsearch.index.shard.ShardPath;
|
||||||
import org.elasticsearch.index.similarity.SimilarityService;
|
import org.elasticsearch.index.similarity.SimilarityService;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.indices.AliasFilterParsingException;
|
import org.elasticsearch.indices.AliasFilterParsingException;
|
||||||
import org.elasticsearch.indices.InvalidAliasNameException;
|
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -296,6 +300,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||||
eventListener.afterIndexShardCreated(indexShard);
|
eventListener.afterIndexShardCreated(indexShard);
|
||||||
indexShard.updateRoutingEntry(routing, true);
|
indexShard.updateRoutingEntry(routing, true);
|
||||||
|
if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) {
|
||||||
|
ThreadPool threadPool = nodeServicesProvider.getThreadPool();
|
||||||
|
new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service.
|
||||||
|
}
|
||||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||||
success = true;
|
success = true;
|
||||||
return indexShard;
|
return indexShard;
|
||||||
|
@ -565,5 +573,57 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||||
return indexStore;
|
return indexStore;
|
||||||
} // pkg private for testing
|
} // pkg private for testing
|
||||||
|
|
||||||
|
private void maybeFSyncTranslogs() {
|
||||||
|
if (indexSettings.getTranslogDurability() == Translog.Durability.ASYNC) {
|
||||||
|
for (IndexShard shard : this.shards.values()) {
|
||||||
|
try {
|
||||||
|
Translog translog = shard.getTranslog();
|
||||||
|
if (translog.syncNeeded()) {
|
||||||
|
translog.sync();
|
||||||
|
}
|
||||||
|
} catch (EngineClosedException | AlreadyClosedException ex) {
|
||||||
|
// fine - continue;
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("failed to sync translog", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FSyncs the translog for all shards of this index in a defined interval.
|
||||||
|
*/
|
||||||
|
final static class AsyncTranslogFSync implements Runnable {
|
||||||
|
private final IndexService indexService;
|
||||||
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) {
|
||||||
|
this.indexService = indexService;
|
||||||
|
this.threadPool = threadPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean mustRun() {
|
||||||
|
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done
|
||||||
|
return (indexService.closed.get() || indexService.shards.isEmpty()) == false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void schedule() {
|
||||||
|
threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (mustRun()) {
|
||||||
|
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
|
||||||
|
indexService.maybeFSyncTranslogs();
|
||||||
|
if (mustRun()) {
|
||||||
|
schedule();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,12 +25,16 @@ import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
|
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
@ -48,6 +52,9 @@ public final class IndexSettings {
|
||||||
public static final String QUERY_STRING_ANALYZE_WILDCARD = "indices.query.query_string.analyze_wildcard";
|
public static final String QUERY_STRING_ANALYZE_WILDCARD = "indices.query.query_string.analyze_wildcard";
|
||||||
public static final String QUERY_STRING_ALLOW_LEADING_WILDCARD = "indices.query.query_string.allowLeadingWildcard";
|
public static final String QUERY_STRING_ALLOW_LEADING_WILDCARD = "indices.query.query_string.allowLeadingWildcard";
|
||||||
public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields";
|
public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields";
|
||||||
|
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
||||||
|
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
|
||||||
|
|
||||||
private final String uuid;
|
private final String uuid;
|
||||||
private final List<Consumer<Settings>> updateListeners;
|
private final List<Consumer<Settings>> updateListeners;
|
||||||
private final Index index;
|
private final Index index;
|
||||||
|
@ -67,6 +74,8 @@ public final class IndexSettings {
|
||||||
private final boolean queryStringAllowLeadingWildcard;
|
private final boolean queryStringAllowLeadingWildcard;
|
||||||
private final boolean defaultAllowUnmappedFields;
|
private final boolean defaultAllowUnmappedFields;
|
||||||
private final Predicate<String> indexNameMatcher;
|
private final Predicate<String> indexNameMatcher;
|
||||||
|
private volatile Translog.Durability durability;
|
||||||
|
private final TimeValue syncInterval;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the default search field for this index.
|
* Returns the default search field for this index.
|
||||||
|
@ -127,7 +136,7 @@ public final class IndexSettings {
|
||||||
public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, final Collection<Consumer<Settings>> updateListeners, final Predicate<String> indexNameMatcher) {
|
public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, final Collection<Consumer<Settings>> updateListeners, final Predicate<String> indexNameMatcher) {
|
||||||
this.nodeSettings = nodeSettings;
|
this.nodeSettings = nodeSettings;
|
||||||
this.settings = Settings.builder().put(nodeSettings).put(indexMetaData.getSettings()).build();
|
this.settings = Settings.builder().put(nodeSettings).put(indexMetaData.getSettings()).build();
|
||||||
this.updateListeners = Collections.unmodifiableList(new ArrayList<>(updateListeners));
|
this.updateListeners = Collections.unmodifiableList( new ArrayList<>(updateListeners));
|
||||||
this.index = new Index(indexMetaData.getIndex());
|
this.index = new Index(indexMetaData.getIndex());
|
||||||
version = Version.indexCreated(settings);
|
version = Version.indexCreated(settings);
|
||||||
uuid = settings.get(IndexMetaData.SETTING_INDEX_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
|
uuid = settings.get(IndexMetaData.SETTING_INDEX_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
|
||||||
|
@ -144,6 +153,10 @@ public final class IndexSettings {
|
||||||
this.parseFieldMatcher = new ParseFieldMatcher(settings);
|
this.parseFieldMatcher = new ParseFieldMatcher(settings);
|
||||||
this.defaultAllowUnmappedFields = settings.getAsBoolean(ALLOW_UNMAPPED, true);
|
this.defaultAllowUnmappedFields = settings.getAsBoolean(ALLOW_UNMAPPED, true);
|
||||||
this.indexNameMatcher = indexNameMatcher;
|
this.indexNameMatcher = indexNameMatcher;
|
||||||
|
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name());
|
||||||
|
this.durability = getFromSettings(settings, Translog.Durability.REQUEST);
|
||||||
|
syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
||||||
|
|
||||||
assert indexNameMatcher.test(indexMetaData.getIndex());
|
assert indexNameMatcher.test(indexMetaData.getIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -295,6 +308,11 @@ public final class IndexSettings {
|
||||||
logger.warn("failed to refresh index settings for [{}]", e, mergedSettings);
|
logger.warn("failed to refresh index settings for [{}]", e, mergedSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
updateSettings(mergedSettings);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("failed to refresh index settings for [{}]", e, mergedSettings);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,4 +322,34 @@ public final class IndexSettings {
|
||||||
List<Consumer<Settings>> getUpdateListeners() { // for testing
|
List<Consumer<Settings>> getUpdateListeners() { // for testing
|
||||||
return updateListeners;
|
return updateListeners;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the translog durability for this index.
|
||||||
|
*/
|
||||||
|
public Translog.Durability getTranslogDurability() {
|
||||||
|
return durability;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Translog.Durability getFromSettings(Settings settings, Translog.Durability defaultValue) {
|
||||||
|
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, defaultValue.name());
|
||||||
|
try {
|
||||||
|
return Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT));
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
logger.warn("Can't apply {} illegal value: {} using {} instead, use one of: {}", INDEX_TRANSLOG_DURABILITY, value, defaultValue, Arrays.toString(Translog.Durability.values()));
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateSettings(Settings settings) {
|
||||||
|
final Translog.Durability durability = getFromSettings(settings, this.durability);
|
||||||
|
if (durability != this.durability) {
|
||||||
|
logger.info("updating durability from [{}] to [{}]", this.durability, durability);
|
||||||
|
this.durability = durability;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public TimeValue getTranslogSyncInterval() {
|
||||||
|
return syncInterval;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,6 @@ import org.elasticsearch.index.termvectors.TermVectorsService;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
import org.elasticsearch.index.translog.TranslogConfig;
|
||||||
import org.elasticsearch.index.translog.TranslogStats;
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
import org.elasticsearch.index.translog.TranslogWriter;
|
|
||||||
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
|
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
|
||||||
import org.elasticsearch.index.warmer.WarmerStats;
|
import org.elasticsearch.index.warmer.WarmerStats;
|
||||||
import org.elasticsearch.indices.IndicesWarmer;
|
import org.elasticsearch.indices.IndicesWarmer;
|
||||||
|
@ -126,10 +125,8 @@ import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
@ -254,8 +251,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
logger.debug("state: [CREATED]");
|
logger.debug("state: [CREATED]");
|
||||||
|
|
||||||
this.checkIndexOnStartup = settings.get("index.shard.check_on_startup", "false");
|
this.checkIndexOnStartup = settings.get("index.shard.check_on_startup", "false");
|
||||||
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, getFromSettings(logger, settings, Translog.Durabilty.REQUEST),
|
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
|
||||||
provider.getBigArrays(), threadPool);
|
provider.getBigArrays());
|
||||||
final QueryCachingPolicy cachingPolicy;
|
final QueryCachingPolicy cachingPolicy;
|
||||||
// the query cache is a node-level thing, however we want the most popular filters
|
// the query cache is a node-level thing, however we want the most popular filters
|
||||||
// to be computed on a per-shard basis
|
// to be computed on a per-shard basis
|
||||||
|
@ -1159,12 +1156,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
this.flushOnClose = flushOnClose;
|
this.flushOnClose = flushOnClose;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty());
|
|
||||||
if (durabilty != translogConfig.getDurabilty()) {
|
|
||||||
logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty);
|
|
||||||
translogConfig.setDurabilty(durabilty);
|
|
||||||
}
|
|
||||||
|
|
||||||
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
|
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
|
||||||
if (!refreshInterval.equals(this.refreshInterval)) {
|
if (!refreshInterval.equals(this.refreshInterval)) {
|
||||||
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
|
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
|
||||||
|
@ -1540,18 +1531,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
/**
|
/**
|
||||||
* Returns the current translog durability mode
|
* Returns the current translog durability mode
|
||||||
*/
|
*/
|
||||||
public Translog.Durabilty getTranslogDurability() {
|
public Translog.Durability getTranslogDurability() {
|
||||||
return translogConfig.getDurabilty();
|
return indexSettings.getTranslogDurability();
|
||||||
}
|
|
||||||
|
|
||||||
private static Translog.Durabilty getFromSettings(ESLogger logger, Settings settings, Translog.Durabilty defaultValue) {
|
|
||||||
final String value = settings.get(TranslogConfig.INDEX_TRANSLOG_DURABILITY, defaultValue.name());
|
|
||||||
try {
|
|
||||||
return Translog.Durabilty.valueOf(value.toUpperCase(Locale.ROOT));
|
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
logger.warn("Can't apply {} illegal value: {} using {} instead, use one of: {}", TranslogConfig.INDEX_TRANSLOG_DURABILITY, value, defaultValue, Arrays.toString(Translog.Durabilty.values()));
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
|
private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
|
@ -160,9 +159,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
writeLock = new ReleasableLock(rwl.writeLock());
|
writeLock = new ReleasableLock(rwl.writeLock());
|
||||||
this.location = config.getTranslogPath();
|
this.location = config.getTranslogPath();
|
||||||
Files.createDirectories(this.location);
|
Files.createDirectories(this.location);
|
||||||
if (config.getSyncInterval().millis() > 0 && config.getThreadPool() != null) {
|
|
||||||
syncScheduler = config.getThreadPool().schedule(config.getSyncInterval(), ThreadPool.Names.SAME, new Sync());
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (translogGeneration != null) {
|
if (translogGeneration != null) {
|
||||||
|
@ -715,34 +711,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Sync implements Runnable {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
// don't re-schedule if its closed..., we are done
|
|
||||||
if (closed.get()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final ThreadPool threadPool = config.getThreadPool();
|
|
||||||
if (syncNeeded()) {
|
|
||||||
threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
sync();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("failed to sync translog", e);
|
|
||||||
}
|
|
||||||
if (closed.get() == false) {
|
|
||||||
syncScheduler = threadPool.schedule(config.getSyncInterval(), ThreadPool.Names.SAME, Sync.this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
syncScheduler = threadPool.schedule(config.getSyncInterval(), ThreadPool.Names.SAME, Sync.this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Location implements Accountable, Comparable<Location> {
|
public static class Location implements Accountable, Comparable<Location> {
|
||||||
|
|
||||||
public final long generation;
|
public final long generation;
|
||||||
|
@ -1188,7 +1156,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public enum Durabilty {
|
public enum Durability {
|
||||||
/**
|
/**
|
||||||
* Async durability - translogs are synced based on a time interval.
|
* Async durability - translogs are synced based on a time interval.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -38,16 +38,9 @@ import java.nio.file.Path;
|
||||||
*/
|
*/
|
||||||
public final class TranslogConfig {
|
public final class TranslogConfig {
|
||||||
|
|
||||||
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
|
|
||||||
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
|
||||||
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
|
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
|
||||||
|
|
||||||
private final TimeValue syncInterval;
|
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private final ThreadPool threadPool;
|
|
||||||
private final boolean syncOnEachOperation;
|
|
||||||
private volatile TranslogGeneration translogGeneration;
|
private volatile TranslogGeneration translogGeneration;
|
||||||
private volatile Translog.Durabilty durabilty = Translog.Durabilty.REQUEST;
|
|
||||||
private final IndexSettings indexSettings;
|
private final IndexSettings indexSettings;
|
||||||
private final ShardId shardId;
|
private final ShardId shardId;
|
||||||
private final Path translogPath;
|
private final Path translogPath;
|
||||||
|
@ -58,67 +51,25 @@ public final class TranslogConfig {
|
||||||
* @param shardId the shard ID this translog belongs to
|
* @param shardId the shard ID this translog belongs to
|
||||||
* @param translogPath the path to use for the transaction log files
|
* @param translogPath the path to use for the transaction log files
|
||||||
* @param indexSettings the index settings used to set internal variables
|
* @param indexSettings the index settings used to set internal variables
|
||||||
* @param durabilty the default durability setting for the translog
|
|
||||||
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
|
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
|
||||||
* @param threadPool a {@link ThreadPool} to schedule async sync durability
|
|
||||||
*/
|
*/
|
||||||
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, Translog.Durabilty durabilty, BigArrays bigArrays, @Nullable ThreadPool threadPool) {
|
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) {
|
||||||
this(shardId, translogPath, indexSettings, durabilty, bigArrays, threadPool, DEFAULT_BUFFER_SIZE);
|
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, Translog.Durabilty durabilty, BigArrays bigArrays, @Nullable ThreadPool threadPool, ByteSizeValue bufferSize) {
|
TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize) {
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.indexSettings = indexSettings;
|
this.indexSettings = indexSettings;
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
this.translogPath = translogPath;
|
this.translogPath = translogPath;
|
||||||
this.durabilty = durabilty;
|
|
||||||
this.threadPool = threadPool;
|
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
|
|
||||||
syncInterval = indexSettings.getSettings().getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
|
||||||
if (syncInterval.millis() > 0 && threadPool != null) {
|
|
||||||
syncOnEachOperation = false;
|
|
||||||
} else if (syncInterval.millis() == 0) {
|
|
||||||
syncOnEachOperation = true;
|
|
||||||
} else {
|
|
||||||
syncOnEachOperation = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@link ThreadPool} to schedule async durability operations
|
|
||||||
*/
|
|
||||||
public ThreadPool getThreadPool() {
|
|
||||||
return threadPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current durability mode of this translog.
|
|
||||||
*/
|
|
||||||
public Translog.Durabilty getDurabilty() {
|
|
||||||
return durabilty;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the current durability mode for the translog.
|
|
||||||
*/
|
|
||||||
public void setDurabilty(Translog.Durabilty durabilty) {
|
|
||||||
this.durabilty = durabilty;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns <code>true</code> iff each low level operation shoudl be fsynced
|
* Returns <code>true</code> iff each low level operation shoudl be fsynced
|
||||||
*/
|
*/
|
||||||
public boolean isSyncOnEachOperation() {
|
public boolean isSyncOnEachOperation() {
|
||||||
return syncOnEachOperation;
|
return indexSettings.getTranslogSyncInterval().millis() == 0;
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current async fsync interval
|
|
||||||
*/
|
|
||||||
public TimeValue getSyncInterval() {
|
|
||||||
return syncInterval;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.VersionUtils;
|
import org.elasticsearch.test.VersionUtils;
|
||||||
|
|
||||||
|
@ -158,4 +159,22 @@ public class IndexSettingsTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testUpdateDurability() {
|
||||||
|
IndexMetaData metaData = newIndexMeta("index", Settings.settingsBuilder()
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, "async")
|
||||||
|
.build());
|
||||||
|
IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY, Collections.emptyList());
|
||||||
|
assertEquals(Translog.Durability.ASYNC, settings.getTranslogDurability());
|
||||||
|
settings.updateIndexMetaData(newIndexMeta("index", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY, "request").build()));
|
||||||
|
assertEquals(Translog.Durability.REQUEST, settings.getTranslogDurability());
|
||||||
|
|
||||||
|
metaData = newIndexMeta("index", Settings.settingsBuilder()
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
.build());
|
||||||
|
settings = new IndexSettings(metaData, Settings.EMPTY, Collections.emptyList());
|
||||||
|
assertEquals(Translog.Durability.REQUEST, settings.getTranslogDurability()); // test default
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,7 +253,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Translog createTranslog(Path translogPath) throws IOException {
|
protected Translog createTranslog(Path translogPath) throws IOException {
|
||||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
|
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
return new Translog(translogConfig);
|
return new Translog(translogConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,7 +271,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
|
|
||||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
||||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
|
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
|
|
||||||
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, INDEX_SETTINGS), indexSettings
|
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, INDEX_SETTINGS), indexSettings
|
||||||
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
|
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
|
||||||
|
@ -1975,14 +1975,14 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
|
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
|
||||||
engine.close();
|
engine.close();
|
||||||
|
|
||||||
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool));
|
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE));
|
||||||
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
|
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
|
||||||
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
|
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
|
||||||
translog.close();
|
translog.close();
|
||||||
|
|
||||||
EngineConfig config = engine.config();
|
EngineConfig config = engine.config();
|
||||||
/* create a TranslogConfig that has been created with a different UUID */
|
/* create a TranslogConfig that has been created with a different UUID */
|
||||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
|
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
|
|
||||||
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
|
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
|
||||||
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
|
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
|
||||||
|
|
|
@ -224,7 +224,7 @@ public class ShadowEngineTests extends ESTestCase {
|
||||||
|
|
||||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
||||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
|
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
|
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
|
||||||
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
|
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
|
||||||
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {
|
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.env.ShardLock;
|
import org.elasticsearch.env.ShardLock;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.NodeServicesProvider;
|
import org.elasticsearch.index.NodeServicesProvider;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineException;
|
import org.elasticsearch.index.engine.EngineException;
|
||||||
|
@ -87,7 +88,6 @@ import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.test.DummyShardLock;
|
import org.elasticsearch.test.DummyShardLock;
|
||||||
|
@ -391,35 +391,35 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
IndexService test = indicesService.indexService("test");
|
IndexService test = indicesService.indexService("test");
|
||||||
IndexShard shard = test.getShardOrNull(0);
|
IndexShard shard = test.getShardOrNull(0);
|
||||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
setDurability(shard, Translog.Durability.REQUEST);
|
||||||
assertFalse(shard.getEngine().getTranslog().syncNeeded());
|
assertFalse(shard.getEngine().getTranslog().syncNeeded());
|
||||||
setDurability(shard, Translog.Durabilty.ASYNC);
|
setDurability(shard, Translog.Durability.ASYNC);
|
||||||
client().prepareIndex("test", "bar", "2").setSource("{}").get();
|
client().prepareIndex("test", "bar", "2").setSource("{}").get();
|
||||||
assertTrue(shard.getEngine().getTranslog().syncNeeded());
|
assertTrue(shard.getEngine().getTranslog().syncNeeded());
|
||||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
setDurability(shard, Translog.Durability.REQUEST);
|
||||||
client().prepareDelete("test", "bar", "1").get();
|
client().prepareDelete("test", "bar", "1").get();
|
||||||
assertFalse(shard.getEngine().getTranslog().syncNeeded());
|
assertFalse(shard.getEngine().getTranslog().syncNeeded());
|
||||||
|
|
||||||
setDurability(shard, Translog.Durabilty.ASYNC);
|
setDurability(shard, Translog.Durability.ASYNC);
|
||||||
client().prepareDelete("test", "bar", "2").get();
|
client().prepareDelete("test", "bar", "2").get();
|
||||||
assertTrue(shard.getEngine().getTranslog().syncNeeded());
|
assertTrue(shard.getEngine().getTranslog().syncNeeded());
|
||||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
setDurability(shard, Translog.Durability.REQUEST);
|
||||||
assertNoFailures(client().prepareBulk()
|
assertNoFailures(client().prepareBulk()
|
||||||
.add(client().prepareIndex("test", "bar", "3").setSource("{}"))
|
.add(client().prepareIndex("test", "bar", "3").setSource("{}"))
|
||||||
.add(client().prepareDelete("test", "bar", "1")).get());
|
.add(client().prepareDelete("test", "bar", "1")).get());
|
||||||
assertFalse(shard.getEngine().getTranslog().syncNeeded());
|
assertFalse(shard.getEngine().getTranslog().syncNeeded());
|
||||||
|
|
||||||
setDurability(shard, Translog.Durabilty.ASYNC);
|
setDurability(shard, Translog.Durability.ASYNC);
|
||||||
assertNoFailures(client().prepareBulk()
|
assertNoFailures(client().prepareBulk()
|
||||||
.add(client().prepareIndex("test", "bar", "4").setSource("{}"))
|
.add(client().prepareIndex("test", "bar", "4").setSource("{}"))
|
||||||
.add(client().prepareDelete("test", "bar", "3")).get());
|
.add(client().prepareDelete("test", "bar", "3")).get());
|
||||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
setDurability(shard, Translog.Durability.REQUEST);
|
||||||
assertTrue(shard.getEngine().getTranslog().syncNeeded());
|
assertTrue(shard.getEngine().getTranslog().syncNeeded());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setDurability(IndexShard shard, Translog.Durabilty durabilty) {
|
private void setDurability(IndexShard shard, Translog.Durability durability) {
|
||||||
client().admin().indices().prepareUpdateSettings(shard.shardId.getIndex()).setSettings(settingsBuilder().put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, durabilty.name()).build()).get();
|
client().admin().indices().prepareUpdateSettings(shard.shardId.getIndex()).setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY, durability.name()).build()).get();
|
||||||
assertEquals(durabilty, shard.getTranslogDurability());
|
assertEquals(durability, shard.getTranslogDurability());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMinimumCompatVersion() {
|
public void testMinimumCompatVersion() {
|
||||||
|
@ -691,7 +691,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMaybeFlush() throws Exception {
|
public void testMaybeFlush() throws Exception {
|
||||||
createIndex("test", settingsBuilder().put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.REQUEST).build());
|
createIndex("test", settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST).build());
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
IndexService test = indicesService.indexService("test");
|
IndexService test = indicesService.indexService("test");
|
||||||
|
|
|
@ -27,18 +27,17 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.io.PathUtils;
|
import org.elasticsearch.common.io.PathUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
|
||||||
import org.elasticsearch.monitor.fs.FsInfo;
|
import org.elasticsearch.monitor.fs.FsInfo;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
||||||
import org.elasticsearch.test.transport.MockTransportService;
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -53,6 +52,7 @@ import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
|
import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||||
|
@ -71,7 +71,6 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
|
||||||
return pluginList(MockTransportService.TestPlugin.class);
|
return pluginList(MockTransportService.TestPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@TestLogging("index.translog:TRACE,index.gateway:TRACE")
|
|
||||||
public void testCorruptTranslogFiles() throws Exception {
|
public void testCorruptTranslogFiles() throws Exception {
|
||||||
internalCluster().startNodesAsync(1, Settings.EMPTY).get();
|
internalCluster().startNodesAsync(1, Settings.EMPTY).get();
|
||||||
|
|
||||||
|
@ -81,7 +80,6 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
|
||||||
.put("index.refresh_interval", "-1")
|
.put("index.refresh_interval", "-1")
|
||||||
.put(MockEngineSupport.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
|
.put(MockEngineSupport.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
|
||||||
.put(IndexShard.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
|
.put(IndexShard.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
|
||||||
.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, "1s") // fsync the translog every second
|
|
||||||
));
|
));
|
||||||
ensureYellow();
|
ensureYellow();
|
||||||
|
|
||||||
|
@ -99,14 +97,13 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
|
||||||
|
|
||||||
// Restart the single node
|
// Restart the single node
|
||||||
internalCluster().fullRestart();
|
internalCluster().fullRestart();
|
||||||
// node needs time to start recovery and discover the translog corruption
|
client().admin().cluster().prepareHealth().setWaitForYellowStatus().setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)).setWaitForEvents(Priority.LANGUID).get();
|
||||||
Thread.sleep(1000);
|
|
||||||
enableTranslogFlush("test");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client().prepareSearch("test").setQuery(matchAllQuery()).get();
|
client().prepareSearch("test").setQuery(matchAllQuery()).get();
|
||||||
fail("all shards should be failed due to a corrupted translog");
|
fail("all shards should be failed due to a corrupted translog");
|
||||||
} catch (SearchPhaseExecutionException e) {
|
} catch (SearchPhaseExecutionException e) {
|
||||||
|
e.printStackTrace();
|
||||||
// Good, all shards should be failed because there is only a
|
// Good, all shards should be failed because there is only a
|
||||||
// single shard and its translog is corrupt
|
// single shard and its translog is corrupt
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||||
.build();
|
.build();
|
||||||
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
|
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
|
||||||
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null, bufferSize);
|
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
|
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||||
|
@ -46,7 +47,7 @@ import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
@ -316,7 +317,7 @@ public class IndexStatsIT extends ESIntegTestCase {
|
||||||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
|
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
|
||||||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "1")
|
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "1")
|
||||||
.put("index.merge.policy.type", "tiered")
|
.put("index.merge.policy.type", "tiered")
|
||||||
.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, "ASYNC")
|
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC.name())
|
||||||
));
|
));
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
long termUpto = 0;
|
long termUpto = 0;
|
||||||
|
|
|
@ -29,9 +29,9 @@ import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.math.MathUtils;
|
import org.elasticsearch.common.math.MathUtils;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.test.BackgroundIndexer;
|
import org.elasticsearch.test.BackgroundIndexer;
|
||||||
|
@ -55,7 +55,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
||||||
public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
|
public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
|
||||||
logger.info("--> creating test index ...");
|
logger.info("--> creating test index ...");
|
||||||
int numberOfShards = numberOfShards();
|
int numberOfShards = numberOfShards();
|
||||||
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
|
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC)));
|
||||||
|
|
||||||
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
||||||
int waitFor = totalNumDocs / 10;
|
int waitFor = totalNumDocs / 10;
|
||||||
|
@ -108,7 +108,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
||||||
public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception {
|
public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception {
|
||||||
logger.info("--> creating test index ...");
|
logger.info("--> creating test index ...");
|
||||||
int numberOfShards = numberOfShards();
|
int numberOfShards = numberOfShards();
|
||||||
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
|
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC)));
|
||||||
|
|
||||||
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
||||||
int waitFor = totalNumDocs / 10;
|
int waitFor = totalNumDocs / 10;
|
||||||
|
@ -159,7 +159,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
||||||
public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception {
|
public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception {
|
||||||
logger.info("--> creating test index ...");
|
logger.info("--> creating test index ...");
|
||||||
int numberOfShards = numberOfShards();
|
int numberOfShards = numberOfShards();
|
||||||
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
|
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC)));
|
||||||
|
|
||||||
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
||||||
int waitFor = totalNumDocs / 10;
|
int waitFor = totalNumDocs / 10;
|
||||||
|
@ -230,7 +230,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
||||||
final int numReplicas = 0;
|
final int numReplicas = 0;
|
||||||
logger.info("--> creating test index ...");
|
logger.info("--> creating test index ...");
|
||||||
int allowNodes = 2;
|
int allowNodes = 2;
|
||||||
assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
|
assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC)));
|
||||||
|
|
||||||
final int numDocs = scaledRandomIntBetween(200, 9999);
|
final int numDocs = scaledRandomIntBetween(200, 9999);
|
||||||
|
|
||||||
|
|
|
@ -95,6 +95,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.MockEngineFactoryPlugin;
|
import org.elasticsearch.index.MockEngineFactoryPlugin;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.codec.CodecService;
|
import org.elasticsearch.index.codec.CodecService;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||||
|
@ -104,8 +105,6 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
|
||||||
import org.elasticsearch.index.translog.TranslogWriter;
|
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||||
import org.elasticsearch.indices.store.IndicesStore;
|
import org.elasticsearch.indices.store.IndicesStore;
|
||||||
|
@ -518,14 +517,14 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
||||||
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
|
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
|
||||||
}
|
}
|
||||||
if (random.nextBoolean()) {
|
if (random.nextBoolean()) {
|
||||||
builder.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
|
builder.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durability.values()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (random.nextBoolean()) {
|
if (random.nextBoolean()) {
|
||||||
if (rarely(random)) {
|
if (rarely(random)) {
|
||||||
builder.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, 0); // 0 has special meaning to sync each op
|
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, 0); // 0 has special meaning to sync each op
|
||||||
} else {
|
} else {
|
||||||
builder.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
|
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue