Move async translog sync logic into IndexService

Today the logic to async - commit the translog is in every translog instance
itself. While the setting is a per index setting we manageing it per shard. This
polluts the translog code and can more easily be managed in IndexService.
This commit is contained in:
Simon Willnauer 2015-12-11 14:55:41 +01:00
parent 445be98e4c
commit c3f901400c
15 changed files with 162 additions and 138 deletions

View File

@ -65,6 +65,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.indexing.IndexingSlowLog;
import org.elasticsearch.index.search.stats.SearchSlowLog;
@ -182,7 +183,7 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndexSettings.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
registerIndexDynamicSetting(IndicesRequestCache.DEPRECATED_INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.metadata.AliasMetaData;
@ -39,6 +40,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@ -57,9 +59,11 @@ import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
@ -296,6 +300,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
indexShard.updateRoutingEntry(routing, true);
if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) {
ThreadPool threadPool = nodeServicesProvider.getThreadPool();
new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service.
}
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true;
return indexShard;
@ -565,5 +573,57 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return indexStore;
} // pkg private for testing
private void maybeFSyncTranslogs() {
if (indexSettings.getTranslogDurability() == Translog.Durabilty.ASYNC) {
for (IndexShard shard : this.shards.values()) {
try {
Translog translog = shard.getTranslog();
if (translog.syncNeeded()) {
translog.sync();
}
} catch (EngineClosedException | AlreadyClosedException ex) {
// fine - continue;
} catch (IOException e) {
logger.warn("failed to sync translog", e);
}
}
}
}
/**
* FSyncs the translog for all shards of this index in a defined interval.
*/
final static class AsyncTranslogFSync implements Runnable {
private final IndexService indexService;
private final ThreadPool threadPool;
AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) {
this.indexService = indexService;
this.threadPool = threadPool;
}
boolean mustRun() {
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done
return (indexService.closed.get() || indexService.shards.isEmpty()) == false;
}
void schedule() {
threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this);
}
@Override
public void run() {
if (mustRun()) {
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
indexService.maybeFSyncTranslogs();
if (mustRun()) {
schedule();
}
});
}
}
}
}

View File

@ -25,12 +25,11 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.translog.Translog;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -48,6 +47,9 @@ public final class IndexSettings {
public static final String QUERY_STRING_ANALYZE_WILDCARD = "indices.query.query_string.analyze_wildcard";
public static final String QUERY_STRING_ALLOW_LEADING_WILDCARD = "indices.query.query_string.allowLeadingWildcard";
public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
private final String uuid;
private final List<Consumer<Settings>> updateListeners;
private final Index index;
@ -67,6 +69,8 @@ public final class IndexSettings {
private final boolean queryStringAllowLeadingWildcard;
private final boolean defaultAllowUnmappedFields;
private final Predicate<String> indexNameMatcher;
private volatile Translog.Durabilty durabilty;
private final TimeValue syncInterval;
/**
* Returns the default search field for this index.
@ -127,7 +131,7 @@ public final class IndexSettings {
public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, final Collection<Consumer<Settings>> updateListeners, final Predicate<String> indexNameMatcher) {
this.nodeSettings = nodeSettings;
this.settings = Settings.builder().put(nodeSettings).put(indexMetaData.getSettings()).build();
this.updateListeners = Collections.unmodifiableList(new ArrayList<>(updateListeners));
this.updateListeners = Collections.unmodifiableList( new ArrayList<>(updateListeners));
this.index = new Index(indexMetaData.getIndex());
version = Version.indexCreated(settings);
uuid = settings.get(IndexMetaData.SETTING_INDEX_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
@ -144,6 +148,10 @@ public final class IndexSettings {
this.parseFieldMatcher = new ParseFieldMatcher(settings);
this.defaultAllowUnmappedFields = settings.getAsBoolean(ALLOW_UNMAPPED, true);
this.indexNameMatcher = indexNameMatcher;
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.REQUEST.name());
this.durabilty = getFromSettings(settings, Translog.Durabilty.REQUEST);
syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
assert indexNameMatcher.test(indexMetaData.getIndex());
}
@ -295,6 +303,11 @@ public final class IndexSettings {
logger.warn("failed to refresh index settings for [{}]", e, mergedSettings);
}
}
try {
updateSettings(mergedSettings);
} catch (Exception e) {
logger.warn("failed to refresh index settings for [{}]", e, mergedSettings);
}
return true;
}
@ -304,4 +317,34 @@ public final class IndexSettings {
List<Consumer<Settings>> getUpdateListeners() { // for testing
return updateListeners;
}
/**
* Returns the translog durability for this index.
*/
public Translog.Durabilty getTranslogDurability() {
return durabilty;
}
public Translog.Durabilty getFromSettings(Settings settings, Translog.Durabilty defaultValue) {
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, defaultValue.name());
try {
return Translog.Durabilty.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException ex) {
logger.warn("Can't apply {} illegal value: {} using {} instead, use one of: {}", INDEX_TRANSLOG_DURABILITY, value, defaultValue, Arrays.toString(Translog.Durabilty.values()));
return defaultValue;
}
}
private void updateSettings(Settings settings) {
final Translog.Durabilty durabilty = getFromSettings(settings, this.durabilty);
if (durabilty != this.durabilty) {
logger.info("updating durability from [{}] to [{}]", this.durabilty, durabilty);
this.durabilty = durabilty;
}
}
public TimeValue getTranslogSyncInterval() {
return syncInterval;
}
}

View File

@ -254,8 +254,8 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.debug("state: [CREATED]");
this.checkIndexOnStartup = settings.get("index.shard.check_on_startup", "false");
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, getFromSettings(logger, settings, Translog.Durabilty.REQUEST),
provider.getBigArrays(), threadPool);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
provider.getBigArrays());
final QueryCachingPolicy cachingPolicy;
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
@ -1159,12 +1159,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.flushOnClose = flushOnClose;
}
final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty());
if (durabilty != translogConfig.getDurabilty()) {
logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty);
translogConfig.setDurabilty(durabilty);
}
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
if (!refreshInterval.equals(this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
@ -1541,17 +1535,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* Returns the current translog durability mode
*/
public Translog.Durabilty getTranslogDurability() {
return translogConfig.getDurabilty();
}
private static Translog.Durabilty getFromSettings(ESLogger logger, Settings settings, Translog.Durabilty defaultValue) {
final String value = settings.get(TranslogConfig.INDEX_TRANSLOG_DURABILITY, defaultValue.name());
try {
return Translog.Durabilty.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException ex) {
logger.warn("Can't apply {} illegal value: {} using {} instead, use one of: {}", TranslogConfig.INDEX_TRANSLOG_DURABILITY, value, defaultValue, Arrays.toString(Translog.Durabilty.values()));
return defaultValue;
}
return indexSettings.getTranslogDurability();
}
private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();

View File

@ -47,7 +47,6 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.EOFException;
@ -160,9 +159,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
writeLock = new ReleasableLock(rwl.writeLock());
this.location = config.getTranslogPath();
Files.createDirectories(this.location);
if (config.getSyncInterval().millis() > 0 && config.getThreadPool() != null) {
syncScheduler = config.getThreadPool().schedule(config.getSyncInterval(), ThreadPool.Names.SAME, new Sync());
}
try {
if (translogGeneration != null) {
@ -715,34 +711,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
class Sync implements Runnable {
@Override
public void run() {
// don't re-schedule if its closed..., we are done
if (closed.get()) {
return;
}
final ThreadPool threadPool = config.getThreadPool();
if (syncNeeded()) {
threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
@Override
public void run() {
try {
sync();
} catch (Exception e) {
logger.warn("failed to sync translog", e);
}
if (closed.get() == false) {
syncScheduler = threadPool.schedule(config.getSyncInterval(), ThreadPool.Names.SAME, Sync.this);
}
}
});
} else {
syncScheduler = threadPool.schedule(config.getSyncInterval(), ThreadPool.Names.SAME, Sync.this);
}
}
}
public static class Location implements Accountable, Comparable<Location> {
public final long generation;

View File

@ -38,16 +38,9 @@ import java.nio.file.Path;
*/
public final class TranslogConfig {
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
private final TimeValue syncInterval;
private final BigArrays bigArrays;
private final ThreadPool threadPool;
private final boolean syncOnEachOperation;
private volatile TranslogGeneration translogGeneration;
private volatile Translog.Durabilty durabilty = Translog.Durabilty.REQUEST;
private final IndexSettings indexSettings;
private final ShardId shardId;
private final Path translogPath;
@ -58,67 +51,25 @@ public final class TranslogConfig {
* @param shardId the shard ID this translog belongs to
* @param translogPath the path to use for the transaction log files
* @param indexSettings the index settings used to set internal variables
* @param durabilty the default durability setting for the translog
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
* @param threadPool a {@link ThreadPool} to schedule async sync durability
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, Translog.Durabilty durabilty, BigArrays bigArrays, @Nullable ThreadPool threadPool) {
this(shardId, translogPath, indexSettings, durabilty, bigArrays, threadPool, DEFAULT_BUFFER_SIZE);
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE);
}
TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, Translog.Durabilty durabilty, BigArrays bigArrays, @Nullable ThreadPool threadPool, ByteSizeValue bufferSize) {
TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.durabilty = durabilty;
this.threadPool = threadPool;
this.bigArrays = bigArrays;
syncInterval = indexSettings.getSettings().getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0 && threadPool != null) {
syncOnEachOperation = false;
} else if (syncInterval.millis() == 0) {
syncOnEachOperation = true;
} else {
syncOnEachOperation = false;
}
}
/**
* Returns a {@link ThreadPool} to schedule async durability operations
*/
public ThreadPool getThreadPool() {
return threadPool;
}
/**
* Returns the current durability mode of this translog.
*/
public Translog.Durabilty getDurabilty() {
return durabilty;
}
/**
* Sets the current durability mode for the translog.
*/
public void setDurabilty(Translog.Durabilty durabilty) {
this.durabilty = durabilty;
}
/**
* Returns <code>true</code> iff each low level operation shoudl be fsynced
*/
public boolean isSyncOnEachOperation() {
return syncOnEachOperation;
}
/**
* Returns the current async fsync interval
*/
public TimeValue getSyncInterval() {
return syncInterval;
return indexSettings.getTranslogSyncInterval().millis() == 0;
}
/**

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
@ -158,4 +159,22 @@ public class IndexSettingsTests extends ESTestCase {
}
public void testUpdateDurability() {
IndexMetaData metaData = newIndexMeta("index", Settings.settingsBuilder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, "async")
.build());
IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY, Collections.emptyList());
assertEquals(Translog.Durabilty.ASYNC, settings.getTranslogDurability());
settings.updateIndexMetaData(newIndexMeta("index", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY, "request").build()));
assertEquals(Translog.Durabilty.REQUEST, settings.getTranslogDurability());
metaData = newIndexMeta("index", Settings.settingsBuilder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build());
settings = new IndexSettings(metaData, Settings.EMPTY, Collections.emptyList());
assertEquals(Translog.Durabilty.REQUEST, settings.getTranslogDurability()); // test default
}
}

View File

@ -253,7 +253,7 @@ public class InternalEngineTests extends ESTestCase {
}
protected Translog createTranslog(Path translogPath) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
return new Translog(translogConfig);
}
@ -271,7 +271,7 @@ public class InternalEngineTests extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, INDEX_SETTINGS), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
@ -1975,14 +1975,14 @@ public class InternalEngineTests extends ESTestCase {
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
engine.close();
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool));
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE));
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();
EngineConfig config = engine.config();
/* create a TranslogConfig that has been created with a different UUID */
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),

View File

@ -224,7 +224,7 @@ public class ShadowEngineTests extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {

View File

@ -69,6 +69,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
@ -87,7 +88,6 @@ import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.DummyShardLock;
@ -418,7 +418,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
private void setDurability(IndexShard shard, Translog.Durabilty durabilty) {
client().admin().indices().prepareUpdateSettings(shard.shardId.getIndex()).setSettings(settingsBuilder().put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, durabilty.name()).build()).get();
client().admin().indices().prepareUpdateSettings(shard.shardId.getIndex()).setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY, durabilty.name()).build()).get();
assertEquals(durabilty, shard.getTranslogDurability());
}
@ -691,7 +691,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
public void testMaybeFlush() throws Exception {
createIndex("test", settingsBuilder().put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.REQUEST).build());
createIndex("test", settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.REQUEST).build());
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");

View File

@ -27,18 +27,17 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import java.io.IOException;
@ -53,6 +52,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -71,7 +71,6 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
return pluginList(MockTransportService.TestPlugin.class);
}
@TestLogging("index.translog:TRACE,index.gateway:TRACE")
public void testCorruptTranslogFiles() throws Exception {
internalCluster().startNodesAsync(1, Settings.EMPTY).get();
@ -81,7 +80,6 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
.put("index.refresh_interval", "-1")
.put(MockEngineSupport.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
.put(IndexShard.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, "1s") // fsync the translog every second
));
ensureYellow();
@ -99,14 +97,13 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
// Restart the single node
internalCluster().fullRestart();
// node needs time to start recovery and discover the translog corruption
Thread.sleep(1000);
enableTranslogFlush("test");
client().admin().cluster().prepareHealth().setWaitForYellowStatus().setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)).setWaitForEvents(Priority.LANGUID).get();
try {
client().prepareSearch("test").setQuery(matchAllQuery()).get();
fail("all shards should be failed due to a corrupted translog");
} catch (SearchPhaseExecutionException e) {
e.printStackTrace();
// Good, all shards should be failed because there is only a
// single shard and its translog is corrupt
}

View File

@ -139,7 +139,7 @@ public class TranslogTests extends ESTestCase {
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null, bufferSize);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
}
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -46,6 +47,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.search.sort.SortOrder;
@ -316,7 +318,7 @@ public class IndexStatsIT extends ESIntegTestCase {
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "1")
.put("index.merge.policy.type", "tiered")
.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, "ASYNC")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC.name())
));
ensureGreen();
long termUpto = 0;

View File

@ -29,9 +29,9 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.BackgroundIndexer;
@ -55,7 +55,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
logger.info("--> creating test index ...");
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
int waitFor = totalNumDocs / 10;
@ -108,7 +108,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception {
logger.info("--> creating test index ...");
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
int waitFor = totalNumDocs / 10;
@ -159,7 +159,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception {
logger.info("--> creating test index ...");
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
int waitFor = totalNumDocs / 10;
@ -230,7 +230,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
final int numReplicas = 0;
logger.info("--> creating test index ...");
int allowNodes = 2;
assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas).put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
final int numDocs = scaledRandomIntBetween(200, 9999);

View File

@ -95,6 +95,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
@ -104,8 +105,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogWriter;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.store.IndicesStore;
@ -518,14 +517,14 @@ public abstract class ESIntegTestCase extends ESTestCase {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
}
if (random.nextBoolean()) {
builder.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
builder.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
}
if (random.nextBoolean()) {
if (rarely(random)) {
builder.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, 0); // 0 has special meaning to sync each op
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, 0); // 0 has special meaning to sync each op
} else {
builder.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
}
}