Move RefreshTask into IndexService and use since task per index

`refresh_interval` is a per index setting but we interpret and maintain it per shard. This
change moves the refresh task outside of IndexShard to the IndexService where it logically belongs
and reuses scheduling infrastructure used for translog fsync (async commit).

This change will use the same task for all shards of an index while previously we used on thread/task
per shard to refresh. This will also prevent too many concurrent refreshes if there are many indices and
shards allocated on a single node.
This commit is contained in:
Simon Willnauer 2016-01-12 17:40:54 +01:00
parent b4a095d430
commit 59211927b6
9 changed files with 334 additions and 124 deletions

View File

@ -148,7 +148,7 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, Validator.EMPTY); registerIndexDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, Validator.EMPTY);
registerIndexDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER); registerIndexDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER);
registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY); registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME); registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME);
registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY); registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY);
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);

View File

@ -21,12 +21,14 @@ package org.elasticsearch.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -42,6 +44,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -104,6 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final IndexSettings indexSettings; private final IndexSettings indexSettings;
private final IndexingSlowLog slowLog; private final IndexingSlowLog slowLog;
private final IndexingOperationListener[] listeners; private final IndexingOperationListener[] listeners;
private volatile RefreshTasks refreshTask;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
SimilarityService similarityService, SimilarityService similarityService,
@ -140,6 +145,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.listeners = new IndexingOperationListener[1+listenersIn.length]; this.listeners = new IndexingOperationListener[1+listenersIn.length];
this.listeners[0] = slowLog; this.listeners[0] = slowLog;
System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool());
} }
public int numberOfShards() { public int numberOfShards() {
@ -310,9 +316,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard); eventListener.afterIndexShardCreated(indexShard);
indexShard.updateRoutingEntry(routing, true); indexShard.updateRoutingEntry(routing, true);
if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) { if (shards.isEmpty()) {
ThreadPool threadPool = nodeServicesProvider.getThreadPool(); ThreadPool threadPool = nodeServicesProvider.getThreadPool();
new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service. if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service.
}
rescheduleRefreshTasks();
} }
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true; success = true;
@ -404,6 +413,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return new QueryShardContext(indexSettings, nodeServicesProvider.getClient(), indexCache.bitsetFilterCache(), indexFieldData, mapperService(), similarityService(), nodeServicesProvider.getScriptService(), nodeServicesProvider.getIndicesQueriesRegistry()); return new QueryShardContext(indexSettings, nodeServicesProvider.getClient(), indexCache.bitsetFilterCache(), indexFieldData, mapperService(), similarityService(), nodeServicesProvider.getScriptService(), nodeServicesProvider.getIndicesQueriesRegistry());
} }
ThreadPool getThreadPool() {
return nodeServicesProvider.getThreadPool();
}
private class StoreCloseListener implements Store.OnClose { private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId; private final ShardId shardId;
private final boolean ownsShard; private final boolean ownsShard;
@ -567,9 +580,21 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to refresh slowlog settings", e); logger.warn("failed to refresh slowlog settings", e);
} }
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
rescheduleRefreshTasks();
}
} }
} }
private void rescheduleRefreshTasks() {
try {
refreshTask.close();
} finally {
refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool());
}
}
public interface ShardStoreDeleter { public interface ShardStoreDeleter {
void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException; void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException;
@ -605,40 +630,165 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} }
} }
private void maybeRefreshEngine() {
/** if (indexSettings.getRefreshInterval().millis() > 0) {
* FSyncs the translog for all shards of this index in a defined interval. for (IndexShard shard : this.shards.values()) {
*/ switch (shard.state()) {
final static class AsyncTranslogFSync implements Runnable { case CREATED:
private final IndexService indexService; case RECOVERING:
private final ThreadPool threadPool; case CLOSED:
continue;
AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) { case POST_RECOVERY:
this.indexService = indexService; case STARTED:
this.threadPool = threadPool; case RELOCATED:
} try {
shard.refresh("schedule");
boolean mustRun() { } catch (EngineClosedException | AlreadyClosedException ex) {
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done // fine - continue;
return (indexService.closed.get() || indexService.shards.isEmpty()) == false; }
} continue;
default:
void schedule() { throw new IllegalStateException("unknown state: " + shard.state());
threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this); }
}
@Override
public void run() {
if (mustRun()) {
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
indexService.maybeFSyncTranslogs();
if (mustRun()) {
schedule();
}
});
} }
} }
} }
static abstract class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
private final TimeValue interval;
private ScheduledFuture<?> scheduledFuture;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile Exception lastThrownException;
BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) {
this.indexService = indexService;
this.threadPool = threadPool;
this.interval = interval;
onTaskCompletion();
}
boolean mustReschedule() {
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done
return (indexService.closed.get() || indexService.shards.isEmpty()) == false
&& closed.get() == false && interval.millis() > 0;
}
private synchronized void onTaskCompletion() {
if (mustReschedule()) {
indexService.logger.debug("scheduling {} every {}", toString(), interval);
this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this);
} else {
indexService.logger.debug("scheduled {} disabled", toString());
this.scheduledFuture = null;
}
}
public final void run() {
try {
runInternal();
} catch (Exception ex) {
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString());
lastThrownException = ex;
}
} finally {
onTaskCompletion();
}
}
private static boolean sameException(Exception left, Exception right) {
if (left.getClass() == right.getClass()) {
if ((left.getMessage() != null && left.getMessage().equals(right.getMessage()))
|| left.getMessage() == right.getMessage()) {
StackTraceElement[] stackTraceLeft = left.getStackTrace();
StackTraceElement[] stackTraceRight = right.getStackTrace();
if (stackTraceLeft.length == stackTraceRight.length) {
for (int i = 0; i < stackTraceLeft.length; i++) {
if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
return false;
}
}
return true;
}
}
}
return false;
}
protected abstract void runInternal();
protected String getThreadPool() {
return ThreadPool.Names.SAME;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
FutureUtils.cancel(scheduledFuture);
scheduledFuture = null;
}
}
TimeValue getInterval() {
return interval;
}
boolean isClosed() {
return this.closed.get();
}
}
/**
* FSyncs the translog for all shards of this index in a defined interval.
*/
final static class AsyncTranslogFSync extends BaseAsyncTask {
AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) {
super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval());
}
protected String getThreadPool() {
return ThreadPool.Names.FLUSH;
}
@Override
protected void runInternal() {
indexService.maybeFSyncTranslogs();
}
@Override
public String toString() {
return "translog_sync";
}
}
final class RefreshTasks extends BaseAsyncTask {
RefreshTasks(IndexService indexService, ThreadPool threadPool) {
super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval());
}
@Override
protected void runInternal() {
indexService.maybeRefreshEngine();
}
protected String getThreadPool() {
return ThreadPool.Names.REFRESH;
}
@Override
public String toString() {
return "refresh";
}
}
RefreshTasks getRefreshTask() { // for tests
return refreshTask;
}
} }

View File

@ -26,8 +26,10 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.internal.AllFieldMapper; import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -35,6 +37,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -54,6 +57,8 @@ public final class IndexSettings {
public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields"; public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability"; public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
private final String uuid; private final String uuid;
private final List<Consumer<Settings>> updateListeners; private final List<Consumer<Settings>> updateListeners;
@ -76,6 +81,9 @@ public final class IndexSettings {
private final Predicate<String> indexNameMatcher; private final Predicate<String> indexNameMatcher;
private volatile Translog.Durability durability; private volatile Translog.Durability durability;
private final TimeValue syncInterval; private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
/** /**
* Returns the default search field for this index. * Returns the default search field for this index.
@ -156,7 +164,7 @@ public final class IndexSettings {
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name()); final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name());
this.durability = getFromSettings(settings, Translog.Durability.REQUEST); this.durability = getFromSettings(settings, Translog.Durability.REQUEST);
syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL);
assert indexNameMatcher.test(indexMetaData.getIndex()); assert indexNameMatcher.test(indexMetaData.getIndex());
} }
@ -346,10 +354,19 @@ public final class IndexSettings {
logger.info("updating durability from [{}] to [{}]", this.durability, durability); logger.info("updating durability from [{}] to [{}]", this.durability, durability);
this.durability = durability; this.durability = durability;
} }
TimeValue refreshInterval = settings.getAsTime(IndexSettings.INDEX_REFRESH_INTERVAL, this.refreshInterval);
if (!refreshInterval.equals(this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
this.refreshInterval = refreshInterval;
}
} }
public TimeValue getTranslogSyncInterval() { public TimeValue getTranslogSyncInterval() {
return syncInterval; return syncInterval;
} }
public TimeValue getRefreshInterval() {
return refreshInterval;
}
} }

View File

@ -84,7 +84,6 @@ public final class EngineConfig {
/** if set to true the engine will start even if the translog id in the commit point can not be found */ /** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
private static final String DEFAULT_CODEC_NAME = "default"; private static final String DEFAULT_CODEC_NAME = "default";

View File

@ -44,7 +44,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -173,8 +172,6 @@ public class IndexShard extends AbstractIndexShardComponent {
* being indexed/deleted. */ * being indexed/deleted. */
private final AtomicLong writingBytes = new AtomicLong(); private final AtomicLong writingBytes = new AtomicLong();
private TimeValue refreshInterval;
private volatile ScheduledFuture<?> refreshScheduledFuture; private volatile ScheduledFuture<?> refreshScheduledFuture;
protected volatile ShardRouting shardRouting; protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state; protected volatile IndexShardState state;
@ -200,7 +197,6 @@ public class IndexShard extends AbstractIndexShardComponent {
*/ */
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close"; public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size"; public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
private final ShardPath path; private final ShardPath path;
@ -249,7 +245,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indexFieldDataService = indexFieldDataService; this.indexFieldDataService = indexFieldDataService;
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
state = IndexShardState.CREATED; state = IndexShardState.CREATED;
this.refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
this.path = path; this.path = path;
this.mergePolicyConfig = new MergePolicyConfig(logger, settings); this.mergePolicyConfig = new MergePolicyConfig(logger, settings);
@ -561,23 +556,25 @@ public class IndexShard extends AbstractIndexShardComponent {
/** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
public void refresh(String source) { public void refresh(String source) {
verifyNotClosed(); verifyNotClosed();
if (canIndex()) { if (getEngine().refreshNeeded()) {
long bytes = getEngine().getIndexBufferRAMBytesUsed(); if (canIndex()) {
writingBytes.addAndGet(bytes); long bytes = getEngine().getIndexBufferRAMBytesUsed();
try { writingBytes.addAndGet(bytes);
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); try {
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
writingBytes.addAndGet(-bytes);
}
} else {
logger.debug("refresh with source [{}]", source);
long time = System.nanoTime(); long time = System.nanoTime();
getEngine().refresh(source); getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time); refreshMetric.inc(System.nanoTime() - time);
} finally {
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
writingBytes.addAndGet(-bytes);
} }
} else {
logger.debug("refresh with source [{}]", source);
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} }
} }
@ -954,7 +951,6 @@ public class IndexShard extends AbstractIndexShardComponent {
public void finalizeRecovery() { public void finalizeRecovery() {
recoveryState().setStage(RecoveryState.Stage.FINALIZE); recoveryState().setStage(RecoveryState.Stage.FINALIZE);
getEngine().refresh("recovery_finalization"); getEngine().refresh("recovery_finalization");
startScheduledTasksIfNeeded();
engineConfig.setEnableGcDeletes(true); engineConfig.setEnableGcDeletes(true);
} }
@ -1022,15 +1018,6 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
} }
private void startScheduledTasksIfNeeded() {
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
logger.debug("scheduling refresher every {}", refreshInterval);
} else {
logger.debug("scheduled refresher disabled");
}
}
/** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
public long getIndexBufferRAMBytesUsed() { public long getIndexBufferRAMBytesUsed() {
Engine engine = getEngineOrNull(); Engine engine = getEngineOrNull();
@ -1133,22 +1120,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.flushOnClose = flushOnClose; this.flushOnClose = flushOnClose;
} }
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
if (!refreshInterval.equals(this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
if (refreshScheduledFuture != null) {
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
}
this.refreshInterval = refreshInterval;
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
}
}
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) { if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
@ -1226,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
} }
private void handleRefreshException(Exception e) { void handleRefreshException(Exception e) {
if (e instanceof EngineClosedException) { if (e instanceof EngineClosedException) {
// ignore // ignore
} else if (e instanceof RefreshFailedEngineException) { } else if (e instanceof RefreshFailedEngineException) {
@ -1284,43 +1255,6 @@ public class IndexShard extends AbstractIndexShardComponent {
internalIndexingStats.noopUpdate(type); internalIndexingStats.noopUpdate(type);
} }
final class EngineRefresher implements Runnable {
@Override
public void run() {
// we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
if (!getEngine().refreshNeeded()) {
reschedule();
return;
}
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
@Override
public void run() {
try {
// TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and
// reschedule if so...
if (getEngine().refreshNeeded()) {
refresh("schedule");
}
} catch (Exception e) {
handleRefreshException(e);
}
reschedule();
}
});
}
/**
* Schedules another (future) refresh, if refresh_interval is still enabled.
*/
private void reschedule() {
synchronized (mutex) {
if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this);
}
}
}
}
private void checkIndex() throws IOException { private void checkIndex() throws IOException {
if (store.tryIncRef()) { if (store.tryIncRef()) {

View File

@ -46,7 +46,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests; import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
@ -383,7 +383,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
assertThat(source, Matchers.hasKey("foo")); assertThat(source, Matchers.hasKey("foo"));
assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder()
.put("refresh_interval", EngineConfig.DEFAULT_REFRESH_INTERVAL) .put("refresh_interval", IndexSettings.DEFAULT_REFRESH_INTERVAL)
.build())); .build()));
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -31,8 +32,13 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -151,4 +157,108 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
IndexMetaData build = IndexMetaData.builder(service.getMetaData()).putAlias(AliasMetaData.builder(alias).filter(filter).build()).build(); IndexMetaData build = IndexMetaData.builder(service.getMetaData()).putAlias(AliasMetaData.builder(alias).filter(filter).build()).build();
service.updateMetaData(build); service.updateMetaData(build);
} }
public void testBaseAsyncTask() throws InterruptedException, IOException {
IndexService indexService = newIndexService();
ThreadPool pool = indexService.getThreadPool();
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
AtomicReference<CountDownLatch> latch2 = new AtomicReference<>(new CountDownLatch(1));
final AtomicInteger count = new AtomicInteger();
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) {
@Override
protected void runInternal() {
count.incrementAndGet();
assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]"));
latch.get().countDown();
try {
latch2.get().await();
} catch (InterruptedException e) {
fail("interrupted");
}
if (randomBoolean()) { // task can throw exceptions!!
if (randomBoolean()) {
throw new RuntimeException("foo");
} else {
throw new RuntimeException("bar");
}
}
}
@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}
};
latch.get().await();
latch.set(new CountDownLatch(1));
assertEquals(1, count.get());
latch2.get().countDown();
latch2.set(new CountDownLatch(1));
latch.get().await();
assertEquals(2, count.get());
task.close();
latch2.get().countDown();
assertEquals(2, count.get());
task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) {
@Override
protected void runInternal() {
}
};
assertTrue(task.mustReschedule());
if (randomBoolean()) {
for (Integer id : indexService.shardIds()) {
indexService.removeShard(id, "simon says");
}
} else {
indexService.close("simon says", false);
}
assertFalse("no shards left", task.mustReschedule());
}
public void testRefreshTaskIsUpdated() {
IndexService indexService = newIndexService();
IndexService.RefreshTasks refreshTask = indexService.getRefreshTask();
assertEquals(1000, refreshTask.getInterval().millis());
assertTrue(indexService.getRefreshTask().mustReschedule());
// now disable
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build();
indexService.updateMetaData(metaData);
assertNotSame(refreshTask, indexService.getRefreshTask());
assertTrue(refreshTask.isClosed());
assertFalse(indexService.getRefreshTask().mustReschedule());
// set it to 100ms
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "100ms")).build();
indexService.updateMetaData(metaData);
assertNotSame(refreshTask, indexService.getRefreshTask());
assertTrue(refreshTask.isClosed());
refreshTask = indexService.getRefreshTask();
assertTrue(refreshTask.mustReschedule());
assertEquals(100, refreshTask.getInterval().millis());
// set it to 200ms
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build();
indexService.updateMetaData(metaData);
assertNotSame(refreshTask, indexService.getRefreshTask());
assertTrue(refreshTask.isClosed());
refreshTask = indexService.getRefreshTask();
assertTrue(refreshTask.mustReschedule());
assertEquals(200, refreshTask.getInterval().millis());
// set it to 200ms again
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build();
indexService.updateMetaData(metaData);
assertSame(refreshTask, indexService.getRefreshTask());
assertTrue(indexService.getRefreshTask().mustReschedule());
assertFalse(refreshTask.isClosed());
assertEquals(200, refreshTask.getInterval().millis());
}
} }

View File

@ -27,10 +27,10 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
@ -47,7 +47,7 @@ public class ParentFieldLoadingIT extends ESIntegTestCase {
private final Settings indexSettings = Settings.builder() private final Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexShard.INDEX_REFRESH_INTERVAL, -1) .put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)
// We never want merges in this test to ensure we have two segments for the last validation // We never want merges in this test to ensure we have two segments for the last validation
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.build(); .build();

View File

@ -81,7 +81,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL; import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;