Merge pull request #15933 from s1monw/move_refresh_into_index_service
Move RefreshTask into IndexService and use since task per index
This commit is contained in:
commit
13c0b1932b
|
@ -148,7 +148,7 @@ public class ClusterModule extends AbstractModule {
|
|||
registerIndexDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER);
|
||||
registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME);
|
||||
registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME);
|
||||
registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
|
||||
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
|
||||
|
|
|
@ -22,11 +22,11 @@ package org.elasticsearch.index;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -42,6 +42,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
|
@ -104,6 +106,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
private final IndexSettings indexSettings;
|
||||
private final IndexingSlowLog slowLog;
|
||||
private final IndexingOperationListener[] listeners;
|
||||
private volatile AsyncRefreshTask refreshTask;
|
||||
private final AsyncTranslogFSync fsyncTask;
|
||||
|
||||
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
|
||||
SimilarityService similarityService,
|
||||
|
@ -140,6 +144,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
this.listeners = new IndexingOperationListener[1+listenersIn.length];
|
||||
this.listeners[0] = slowLog;
|
||||
System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
|
||||
// kick off async ops for the first shard in this index
|
||||
if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
|
||||
this.fsyncTask = new AsyncTranslogFSync(this);
|
||||
} else {
|
||||
this.fsyncTask = null;
|
||||
}
|
||||
this.refreshTask = new AsyncRefreshTask(this);
|
||||
}
|
||||
|
||||
public int numberOfShards() {
|
||||
|
@ -215,7 +226,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService);
|
||||
IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -306,14 +317,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
} else {
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners);
|
||||
}
|
||||
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
indexShard.updateRoutingEntry(routing, true);
|
||||
if (shards.isEmpty() && this.indexSettings.getTranslogSyncInterval().millis() != 0) {
|
||||
ThreadPool threadPool = nodeServicesProvider.getThreadPool();
|
||||
new AsyncTranslogFSync(this, threadPool).schedule(); // kick this off if we are the first shard in this service.
|
||||
}
|
||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||
success = true;
|
||||
return indexShard;
|
||||
|
@ -404,6 +410,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
return new QueryShardContext(indexSettings, nodeServicesProvider.getClient(), indexCache.bitsetFilterCache(), indexFieldData, mapperService(), similarityService(), nodeServicesProvider.getScriptService(), nodeServicesProvider.getIndicesQueriesRegistry());
|
||||
}
|
||||
|
||||
ThreadPool getThreadPool() {
|
||||
return nodeServicesProvider.getThreadPool();
|
||||
}
|
||||
|
||||
private class StoreCloseListener implements Store.OnClose {
|
||||
private final ShardId shardId;
|
||||
private final boolean ownsShard;
|
||||
|
@ -567,9 +577,21 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
} catch (Exception e) {
|
||||
logger.warn("failed to refresh slowlog settings", e);
|
||||
}
|
||||
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
|
||||
rescheduleRefreshTasks();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void rescheduleRefreshTasks() {
|
||||
try {
|
||||
refreshTask.close();
|
||||
} finally {
|
||||
refreshTask = new AsyncRefreshTask(this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public interface ShardStoreDeleter {
|
||||
void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException;
|
||||
|
||||
|
@ -605,40 +627,170 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* FSyncs the translog for all shards of this index in a defined interval.
|
||||
*/
|
||||
final static class AsyncTranslogFSync implements Runnable {
|
||||
private final IndexService indexService;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) {
|
||||
this.indexService = indexService;
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
boolean mustRun() {
|
||||
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done
|
||||
return (indexService.closed.get() || indexService.shards.isEmpty()) == false;
|
||||
}
|
||||
|
||||
void schedule() {
|
||||
threadPool.schedule(indexService.getIndexSettings().getTranslogSyncInterval(), ThreadPool.Names.SAME, AsyncTranslogFSync.this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (mustRun()) {
|
||||
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
|
||||
indexService.maybeFSyncTranslogs();
|
||||
if (mustRun()) {
|
||||
schedule();
|
||||
}
|
||||
});
|
||||
private void maybeRefreshEngine() {
|
||||
if (indexSettings.getRefreshInterval().millis() > 0) {
|
||||
for (IndexShard shard : this.shards.values()) {
|
||||
switch (shard.state()) {
|
||||
case CREATED:
|
||||
case RECOVERING:
|
||||
case CLOSED:
|
||||
continue;
|
||||
case POST_RECOVERY:
|
||||
case STARTED:
|
||||
case RELOCATED:
|
||||
try {
|
||||
shard.refresh("schedule");
|
||||
} catch (EngineClosedException | AlreadyClosedException ex) {
|
||||
// fine - continue;
|
||||
}
|
||||
continue;
|
||||
default:
|
||||
throw new IllegalStateException("unknown state: " + shard.state());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static abstract class BaseAsyncTask implements Runnable, Closeable {
|
||||
protected final IndexService indexService;
|
||||
protected final ThreadPool threadPool;
|
||||
private final TimeValue interval;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private volatile Exception lastThrownException;
|
||||
|
||||
BaseAsyncTask(IndexService indexService, TimeValue interval) {
|
||||
this.indexService = indexService;
|
||||
this.threadPool = indexService.getThreadPool();
|
||||
this.interval = interval;
|
||||
onTaskCompletion();
|
||||
}
|
||||
|
||||
boolean mustReschedule() {
|
||||
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done
|
||||
return indexService.closed.get() == false
|
||||
&& closed.get() == false && interval.millis() > 0;
|
||||
}
|
||||
|
||||
private synchronized void onTaskCompletion() {
|
||||
if (mustReschedule()) {
|
||||
indexService.logger.debug("scheduling {} every {}", toString(), interval);
|
||||
this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this);
|
||||
} else {
|
||||
indexService.logger.debug("scheduled {} disabled", toString());
|
||||
this.scheduledFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
boolean isScheduled() {
|
||||
return scheduledFuture != null;
|
||||
}
|
||||
|
||||
public final void run() {
|
||||
try {
|
||||
runInternal();
|
||||
} catch (Exception ex) {
|
||||
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
|
||||
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
|
||||
indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", ex, toString());
|
||||
lastThrownException = ex;
|
||||
}
|
||||
} finally {
|
||||
onTaskCompletion();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean sameException(Exception left, Exception right) {
|
||||
if (left.getClass() == right.getClass()) {
|
||||
if ((left.getMessage() != null && left.getMessage().equals(right.getMessage()))
|
||||
|| left.getMessage() == right.getMessage()) {
|
||||
StackTraceElement[] stackTraceLeft = left.getStackTrace();
|
||||
StackTraceElement[] stackTraceRight = right.getStackTrace();
|
||||
if (stackTraceLeft.length == stackTraceRight.length) {
|
||||
for (int i = 0; i < stackTraceLeft.length; i++) {
|
||||
if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected abstract void runInternal();
|
||||
|
||||
protected String getThreadPool() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
FutureUtils.cancel(scheduledFuture);
|
||||
scheduledFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
TimeValue getInterval() {
|
||||
return interval;
|
||||
}
|
||||
|
||||
boolean isClosed() {
|
||||
return this.closed.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* FSyncs the translog for all shards of this index in a defined interval.
|
||||
*/
|
||||
final static class AsyncTranslogFSync extends BaseAsyncTask {
|
||||
|
||||
AsyncTranslogFSync(IndexService indexService) {
|
||||
super(indexService, indexService.getIndexSettings().getTranslogSyncInterval());
|
||||
}
|
||||
|
||||
protected String getThreadPool() {
|
||||
return ThreadPool.Names.FLUSH;
|
||||
}
|
||||
@Override
|
||||
protected void runInternal() {
|
||||
indexService.maybeFSyncTranslogs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "translog_sync";
|
||||
}
|
||||
}
|
||||
|
||||
final class AsyncRefreshTask extends BaseAsyncTask {
|
||||
|
||||
AsyncRefreshTask(IndexService indexService) {
|
||||
super(indexService, indexService.getIndexSettings().getRefreshInterval());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runInternal() {
|
||||
indexService.maybeRefreshEngine();
|
||||
}
|
||||
|
||||
protected String getThreadPool() {
|
||||
return ThreadPool.Names.REFRESH;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "refresh";
|
||||
}
|
||||
}
|
||||
|
||||
AsyncRefreshTask getRefreshTask() { // for tests
|
||||
return refreshTask;
|
||||
}
|
||||
|
||||
AsyncTranslogFSync getFsyncTask() { // for tests
|
||||
return fsyncTask;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,8 +26,10 @@ import org.elasticsearch.common.logging.Loggers;
|
|||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -35,6 +37,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
@ -54,6 +57,8 @@ public final class IndexSettings {
|
|||
public static final String ALLOW_UNMAPPED = "index.query.parse.allow_unmapped_fields";
|
||||
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
||||
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
|
||||
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
|
||||
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
|
||||
|
||||
private final String uuid;
|
||||
private final List<Consumer<Settings>> updateListeners;
|
||||
|
@ -76,6 +81,9 @@ public final class IndexSettings {
|
|||
private final Predicate<String> indexNameMatcher;
|
||||
private volatile Translog.Durability durability;
|
||||
private final TimeValue syncInterval;
|
||||
private volatile TimeValue refreshInterval;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Returns the default search field for this index.
|
||||
|
@ -156,7 +164,7 @@ public final class IndexSettings {
|
|||
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, Translog.Durability.REQUEST.name());
|
||||
this.durability = getFromSettings(settings, Translog.Durability.REQUEST);
|
||||
syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
||||
|
||||
refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL);
|
||||
assert indexNameMatcher.test(indexMetaData.getIndex());
|
||||
}
|
||||
|
||||
|
@ -346,10 +354,19 @@ public final class IndexSettings {
|
|||
logger.info("updating durability from [{}] to [{}]", this.durability, durability);
|
||||
this.durability = durability;
|
||||
}
|
||||
|
||||
TimeValue refreshInterval = settings.getAsTime(IndexSettings.INDEX_REFRESH_INTERVAL, this.refreshInterval);
|
||||
if (!refreshInterval.equals(this.refreshInterval)) {
|
||||
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
|
||||
this.refreshInterval = refreshInterval;
|
||||
}
|
||||
}
|
||||
|
||||
public TimeValue getTranslogSyncInterval() {
|
||||
return syncInterval;
|
||||
}
|
||||
|
||||
public TimeValue getRefreshInterval() {
|
||||
return refreshInterval;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,6 @@ public final class EngineConfig {
|
|||
/** if set to true the engine will start even if the translog id in the commit point can not be found */
|
||||
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
|
||||
|
||||
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
|
||||
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
|
||||
|
||||
private static final String DEFAULT_CODEC_NAME = "default";
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -173,8 +172,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
* being indexed/deleted. */
|
||||
private final AtomicLong writingBytes = new AtomicLong();
|
||||
|
||||
private TimeValue refreshInterval;
|
||||
|
||||
private volatile ScheduledFuture<?> refreshScheduledFuture;
|
||||
protected volatile ShardRouting shardRouting;
|
||||
protected volatile IndexShardState state;
|
||||
|
@ -200,7 +197,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
*/
|
||||
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
|
||||
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
|
||||
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
|
||||
|
||||
private final ShardPath path;
|
||||
|
||||
|
@ -249,7 +245,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.indexFieldDataService = indexFieldDataService;
|
||||
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
|
||||
state = IndexShardState.CREATED;
|
||||
this.refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
|
||||
this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
|
||||
this.path = path;
|
||||
this.mergePolicyConfig = new MergePolicyConfig(logger, settings);
|
||||
|
@ -561,23 +556,25 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
/** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
|
||||
public void refresh(String source) {
|
||||
verifyNotClosed();
|
||||
if (canIndex()) {
|
||||
long bytes = getEngine().getIndexBufferRAMBytesUsed();
|
||||
writingBytes.addAndGet(bytes);
|
||||
try {
|
||||
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
|
||||
if (getEngine().refreshNeeded()) {
|
||||
if (canIndex()) {
|
||||
long bytes = getEngine().getIndexBufferRAMBytesUsed();
|
||||
writingBytes.addAndGet(bytes);
|
||||
try {
|
||||
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
|
||||
long time = System.nanoTime();
|
||||
getEngine().refresh(source);
|
||||
refreshMetric.inc(System.nanoTime() - time);
|
||||
} finally {
|
||||
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
|
||||
writingBytes.addAndGet(-bytes);
|
||||
}
|
||||
} else {
|
||||
logger.debug("refresh with source [{}]", source);
|
||||
long time = System.nanoTime();
|
||||
getEngine().refresh(source);
|
||||
refreshMetric.inc(System.nanoTime() - time);
|
||||
} finally {
|
||||
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
|
||||
writingBytes.addAndGet(-bytes);
|
||||
}
|
||||
} else {
|
||||
logger.debug("refresh with source [{}]", source);
|
||||
long time = System.nanoTime();
|
||||
getEngine().refresh(source);
|
||||
refreshMetric.inc(System.nanoTime() - time);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -954,7 +951,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
public void finalizeRecovery() {
|
||||
recoveryState().setStage(RecoveryState.Stage.FINALIZE);
|
||||
getEngine().refresh("recovery_finalization");
|
||||
startScheduledTasksIfNeeded();
|
||||
engineConfig.setEnableGcDeletes(true);
|
||||
}
|
||||
|
||||
|
@ -1022,15 +1018,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private void startScheduledTasksIfNeeded() {
|
||||
if (refreshInterval.millis() > 0) {
|
||||
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
|
||||
logger.debug("scheduling refresher every {}", refreshInterval);
|
||||
} else {
|
||||
logger.debug("scheduled refresher disabled");
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
|
||||
public long getIndexBufferRAMBytesUsed() {
|
||||
Engine engine = getEngineOrNull();
|
||||
|
@ -1133,22 +1120,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.flushOnClose = flushOnClose;
|
||||
}
|
||||
|
||||
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
|
||||
if (!refreshInterval.equals(this.refreshInterval)) {
|
||||
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
|
||||
if (refreshScheduledFuture != null) {
|
||||
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
|
||||
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
|
||||
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
|
||||
FutureUtils.cancel(refreshScheduledFuture);
|
||||
refreshScheduledFuture = null;
|
||||
}
|
||||
this.refreshInterval = refreshInterval;
|
||||
if (refreshInterval.millis() > 0) {
|
||||
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
|
||||
}
|
||||
}
|
||||
|
||||
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
|
||||
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
|
||||
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
|
||||
|
@ -1284,43 +1255,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
internalIndexingStats.noopUpdate(type);
|
||||
}
|
||||
|
||||
final class EngineRefresher implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
// we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
|
||||
if (!getEngine().refreshNeeded()) {
|
||||
reschedule();
|
||||
return;
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and
|
||||
// reschedule if so...
|
||||
if (getEngine().refreshNeeded()) {
|
||||
refresh("schedule");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
handleRefreshException(e);
|
||||
}
|
||||
|
||||
reschedule();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules another (future) refresh, if refresh_interval is still enabled.
|
||||
*/
|
||||
private void reschedule() {
|
||||
synchronized (mutex) {
|
||||
if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) {
|
||||
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIndex() throws IOException {
|
||||
if (store.tryIncRef()) {
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Segment;
|
||||
import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
@ -383,7 +383,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
|
|||
assertThat(source, Matchers.hasKey("foo"));
|
||||
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder()
|
||||
.put("refresh_interval", EngineConfig.DEFAULT_REFRESH_INTERVAL)
|
||||
.put("refresh_interval", IndexSettings.DEFAULT_REFRESH_INTERVAL)
|
||||
.build()));
|
||||
}
|
||||
|
||||
|
|
|
@ -19,20 +19,29 @@
|
|||
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -151,4 +160,175 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
IndexMetaData build = IndexMetaData.builder(service.getMetaData()).putAlias(AliasMetaData.builder(alias).filter(filter).build()).build();
|
||||
service.updateMetaData(build);
|
||||
}
|
||||
|
||||
public void testBaseAsyncTask() throws InterruptedException, IOException {
|
||||
IndexService indexService = newIndexService();
|
||||
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
|
||||
AtomicReference<CountDownLatch> latch2 = new AtomicReference<>(new CountDownLatch(1));
|
||||
final AtomicInteger count = new AtomicInteger();
|
||||
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1)) {
|
||||
@Override
|
||||
protected void runInternal() {
|
||||
count.incrementAndGet();
|
||||
assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]"));
|
||||
latch.get().countDown();
|
||||
try {
|
||||
latch2.get().await();
|
||||
} catch (InterruptedException e) {
|
||||
fail("interrupted");
|
||||
}
|
||||
if (randomBoolean()) { // task can throw exceptions!!
|
||||
if (randomBoolean()) {
|
||||
throw new RuntimeException("foo");
|
||||
} else {
|
||||
throw new RuntimeException("bar");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getThreadPool() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
};
|
||||
latch.get().await();
|
||||
latch.set(new CountDownLatch(1));
|
||||
assertEquals(1, count.get());
|
||||
latch2.get().countDown();
|
||||
latch2.set(new CountDownLatch(1));
|
||||
|
||||
latch.get().await();
|
||||
assertEquals(2, count.get());
|
||||
task.close();
|
||||
latch2.get().countDown();
|
||||
assertEquals(2, count.get());
|
||||
|
||||
|
||||
task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) {
|
||||
@Override
|
||||
protected void runInternal() {
|
||||
|
||||
}
|
||||
};
|
||||
assertTrue(task.mustReschedule());
|
||||
indexService.close("simon says", false);
|
||||
assertFalse("no shards left", task.mustReschedule());
|
||||
assertTrue(task.isScheduled());
|
||||
task.close();
|
||||
assertFalse(task.isScheduled());
|
||||
}
|
||||
|
||||
public void testRefreshTaskIsUpdated() throws IOException {
|
||||
IndexService indexService = newIndexService();
|
||||
IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask();
|
||||
assertEquals(1000, refreshTask.getInterval().millis());
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
|
||||
// now disable
|
||||
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
assertNotSame(refreshTask, indexService.getRefreshTask());
|
||||
assertTrue(refreshTask.isClosed());
|
||||
assertFalse(refreshTask.isScheduled());
|
||||
assertFalse(indexService.getRefreshTask().mustReschedule());
|
||||
|
||||
// set it to 100ms
|
||||
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "100ms")).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
assertNotSame(refreshTask, indexService.getRefreshTask());
|
||||
assertTrue(refreshTask.isClosed());
|
||||
|
||||
refreshTask = indexService.getRefreshTask();
|
||||
assertTrue(refreshTask.mustReschedule());
|
||||
assertTrue(refreshTask.isScheduled());
|
||||
assertEquals(100, refreshTask.getInterval().millis());
|
||||
|
||||
// set it to 200ms
|
||||
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
assertNotSame(refreshTask, indexService.getRefreshTask());
|
||||
assertTrue(refreshTask.isClosed());
|
||||
|
||||
refreshTask = indexService.getRefreshTask();
|
||||
assertTrue(refreshTask.mustReschedule());
|
||||
assertTrue(refreshTask.isScheduled());
|
||||
assertEquals(200, refreshTask.getInterval().millis());
|
||||
|
||||
// set it to 200ms again
|
||||
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "200ms")).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
assertSame(refreshTask, indexService.getRefreshTask());
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
assertTrue(refreshTask.isScheduled());
|
||||
assertFalse(refreshTask.isClosed());
|
||||
assertEquals(200, refreshTask.getInterval().millis());
|
||||
indexService.close("simon says", false);
|
||||
assertFalse(refreshTask.isScheduled());
|
||||
assertTrue(refreshTask.isClosed());
|
||||
}
|
||||
|
||||
public void testFsyncTaskIsRunning() throws IOException {
|
||||
IndexService indexService = newIndexService();
|
||||
IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask();
|
||||
assertNotNull(fsyncTask);
|
||||
assertEquals(5000, fsyncTask.getInterval().millis());
|
||||
assertTrue(fsyncTask.mustReschedule());
|
||||
assertTrue(fsyncTask.isScheduled());
|
||||
|
||||
indexService.close("simon says", false);
|
||||
assertFalse(fsyncTask.isScheduled());
|
||||
assertTrue(fsyncTask.isClosed());
|
||||
}
|
||||
|
||||
public void testRefreshActuallyWorks() throws Exception {
|
||||
IndexService indexService = newIndexService();
|
||||
ensureGreen("test");
|
||||
IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask();
|
||||
assertEquals(1000, refreshTask.getInterval().millis());
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
|
||||
// now disable
|
||||
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(0, search.totalHits);
|
||||
}
|
||||
// refresh every millisecond
|
||||
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "1ms")).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
assertBusy(() -> {
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(1, search.totalHits);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testAsyncFsyncActuallyWorks() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, "10ms") // very often :)
|
||||
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC)
|
||||
.build();
|
||||
IndexService indexService = createIndex("test", settings);
|
||||
ensureGreen("test");
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
assertBusy(() -> {
|
||||
assertFalse(shard.getTranslog().syncNeeded());
|
||||
});
|
||||
}
|
||||
|
||||
public void testNoFsyncTaskIfDisabled() {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, "0ms") // disable
|
||||
.build();
|
||||
IndexService indexService = createIndex("test", settings);
|
||||
assertNull(indexService.getFsyncTask());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,10 +27,10 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -47,7 +47,7 @@ public class ParentFieldLoadingIT extends ESIntegTestCase {
|
|||
private final Settings indexSettings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexShard.INDEX_REFRESH_INTERVAL, -1)
|
||||
.put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)
|
||||
// We never want merges in this test to ensure we have two segments for the last validation
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.build();
|
||||
|
|
|
@ -81,7 +81,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
|
|||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL;
|
||||
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
|
||||
|
|
Loading…
Reference in New Issue