[CORE] Fold engine into IndexShard

This commit removes most of the Engine abstractions and removes
Engine exposure via dependency injection. It also removes the Holder
abstraction and makes the engine itself start at constrcution time.
It removes the start method from the engine entire which means no engine
instances exists until it's started. There is also no way to stop the
engine to restart, it needs to be an entire new Engine
This commit is contained in:
Simon Willnauer 2015-01-07 16:32:32 +01:00
parent dedaf9387e
commit 959e3ca9da
30 changed files with 1061 additions and 1336 deletions

View File

@ -42,7 +42,6 @@ import org.elasticsearch.index.cache.filter.ShardFilterCacheModule;
import org.elasticsearch.index.cache.query.ShardQueryCacheModule;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineModule;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldDataModule;
import org.elasticsearch.index.gateway.IndexShardGatewayModule;
@ -302,7 +301,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
ModulesBuilder modules = new ModulesBuilder();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(shardId));
modules.add(new IndexShardModule(shardId, indexSettings));
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
@ -316,7 +315,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
modules.add(new ShardBitsetFilterCacheModule());
modules.add(new ShardFieldDataModule());
modules.add(new TranslogModule(indexSettings));
modules.add(new EngineModule(indexSettings));
modules.add(new IndexShardGatewayModule());
modules.add(new PercolatorShardModule());
modules.add(new ShardTermVectorsModule());
@ -391,7 +389,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
closeInjectorResource(sId, shardInjector,
Engine.class,
MergeSchedulerProvider.class,
MergePolicyProvider.class,
IndexShardGatewayService.class,

View File

@ -48,7 +48,8 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
@ -233,7 +234,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
final class BitDocIdSetFilterWarmer extends IndicesWarmer.Listener {
@Override
public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) {
public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) {
if (!loadRandomAccessFiltersEagerly) {
return TerminationHandle.NO_WAIT;
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -49,33 +50,8 @@ import java.util.List;
*/
public interface Engine extends Closeable {
static final String INDEX_CODEC = "index.codec";
static ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb");
ShardId shardId();
/**
* The default suggested refresh interval, -1 to disable it.
*/
TimeValue defaultRefreshInterval();
void enableGcDeletes(boolean enableGcDeletes);
void updateIndexingBufferSize(ByteSizeValue indexingBufferSize);
void addFailedEngineListener(FailedEngineListener listener);
/**
* Starts the Engine.
* <p/>
* <p>Note, after the creation and before the call to start, the store might
* be changed.
*/
void start() throws EngineException;
/** Stops the engine but allow to re-start it */
void stop() throws EngineException;
void create(Create create) throws EngineException;
void index(Index index) throws EngineException;
@ -127,6 +103,9 @@ public interface Engine extends Closeable {
*/
void forceMerge(boolean flush, boolean waitForMerge);
/**
* Triggers a forced merge on this engine
*/
void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
/**
@ -140,6 +119,8 @@ public interface Engine extends Closeable {
/** fail engine due to some error. the engine will also be closed. */
void failEngine(String reason, Throwable failure);
ByteSizeValue indexingBufferSize();
static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
}
@ -747,5 +728,4 @@ public interface Engine extends Closeable {
}
}
}
}

View File

@ -0,0 +1,447 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
/*
* Holds all the configuration that is used to create an {@link Engine}.
* Once {@link Engine} has been created with this object, changes to this
* object will affect the {@link Engine} instance.
*/
public final class EngineConfig {
private final ShardId shardId;
private volatile boolean failOnMergeFailure = true;
private volatile boolean failEngineOnCorruption = true;
private volatile ByteSizeValue indexingBufferSize;
private volatile int indexConcurrency = IndexWriterConfig.DEFAULT_MAX_THREAD_STATES;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
private volatile String codecName = DEFAULT_CODEC_NAME;
private final boolean optimizeAutoGenerateId;
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
private final IndexSettingsService indexSettingsService;
@Nullable
private final IndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
private final Engine.FailedEngineListener failedEngineListener;
/**
* Index setting for index concurrency / number of threadstates in the indexwriter.
* The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES}
* This setting is realtime updateable
*/
public static final String INDEX_CONCURRENCY_SETTING = "index.index_concurrency";
/**
* Index setting for compound file on flush. This setting is realtime updateable.
*/
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
/**
* Setting to control auto generated ID optimizations. Default is <code>true</code> if not present.
* This setting is <b>not</b> realtime updateable.
*/
public static final String INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING = "index.optimize_auto_generated_id";
/**
* Index setting to enable / disable deletes garbage collection.
* This setting is realtime updateable
*/
public static final String INDEX_GC_DELETES_SETTING = "index.gc_deletes";
/**
* Index setting to enable / disable engine failures on merge exceptions. Default is <code>true</code> / <tt>enabled</tt>.
* This setting is realtime updateable.
*/
public static final String INDEX_FAIL_ON_MERGE_FAILURE_SETTING = "index.fail_on_merge_failure";
/**
* Index setting to enable / disable engine failures on detected index corruptions. Default is <code>true</code> / <tt>enabled</tt>.
* This setting is realtime updateable.
*/
public static final String INDEX_FAIL_ON_CORRUPTION_SETTING = "index.fail_on_corruption";
/**
* Index setting to control the initial index buffer size.
* This setting is <b>not</b> realtime updateable.
*/
public static final String INDEX_BUFFER_SIZE_SETTING = "index.buffer_size";
/**
* Index setting to change the low level lucene codec used for writing new segments.
* This setting is realtime updateable.
*/
public static final String INDEX_CODEC_SETTING = "index.codec";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAUTL_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb");
private static final String DEFAULT_CODEC_NAME = "default";
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(ShardId shardId, boolean optimizeAutoGenerateId, ThreadPool threadPool, ShardIndexingService indexingService, IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener) {
this.shardId = shardId;
this.optimizeAutoGenerateId = optimizeAutoGenerateId;
this.threadPool = threadPool;
this.indexingService = indexingService;
this.indexSettingsService = indexSettingsService;
this.warmer = warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.translog = translog;
this.mergePolicyProvider = mergePolicyProvider;
this.mergeScheduler = mergeScheduler;
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
Settings indexSettings = indexSettingsService.getSettings();
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
indexingBufferSize = indexSettings.getAsBytesSize(INDEX_BUFFER_SIZE_SETTING, DEFAUTL_INDEX_BUFFER_SIZE);
failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION_SETTING, true);
failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE_SETTING, true);
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
}
/**
* Sets the indexing buffer
*/
public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
}
/**
* Sets the index concurrency
* @see #getIndexConcurrency()
*/
public void setIndexConcurrency(int indexConcurrency) {
this.indexConcurrency = indexConcurrency;
}
/**
* Enables / disables gc deletes
*
* @see #isEnableGcDeletes()
*/
public void setEnableGcDeletes(boolean enableGcDeletes) {
this.enableGcDeletes = enableGcDeletes;
}
/**
* Returns <code>true</code> iff the engine should be failed if a merge error is hit. Defaults to <code>true</code>
*/
public boolean isFailOnMergeFailure() {
return failOnMergeFailure;
}
/**
* Returns <code>true</code> if the engine should be failed in the case of a corrupted index. Defaults to <code>true</code>
*/
public boolean isFailEngineOnCorruption() {
return failEngineOnCorruption;
}
/**
* Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link org.elasticsearch.indices.memory.IndexingMemoryController}
*/
public ByteSizeValue getIndexingBufferSize() {
return indexingBufferSize;
}
/**
* Returns the index concurrency that directly translates into the number of thread states used in the engines
* {@code IndexWriter}.
*
* @see org.apache.lucene.index.IndexWriterConfig#getMaxThreadStates()
*/
public int getIndexConcurrency() {
return indexConcurrency;
}
/**
* Returns <code>true</code> iff flushed segments should be written as compound file system. Defaults to <code>true</code>
*/
public boolean isCompoundOnFlush() {
return compoundOnFlush;
}
/**
* Returns the GC deletes cycle in milliseconds.
*/
public long getGcDeletesInMillis() {
return gcDeletesInMillis;
}
/**
* Returns <code>true</code> iff delete garbage collection in the engine should be enabled. This setting is updateable
* in realtime and forces a volatile read. Consumers can safely read this value directly go fetch it's latest value. The default is <code>true</code>
* <p>
* Engine GC deletion if enabled collects deleted documents from in-memory realtime data structures after a certain amount of
* time ({@link #getGcDeletesInMillis()} if enabled. Before deletes are GCed they will cause re-adding the document that was deleted
* to fail.
* </p>
*/
public boolean isEnableGcDeletes() {
return enableGcDeletes;
}
/**
* Returns the {@link Codec} used in the engines {@link org.apache.lucene.index.IndexWriter}
* <p>
* Note: this settings is only read on startup and if a new writer is created. This happens either due to a
* settings change in the {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} or if
* {@link Engine#flush(org.elasticsearch.index.engine.Engine.FlushType, boolean, boolean)} with {@link org.elasticsearch.index.engine.Engine.FlushType#NEW_WRITER} is executed.
* </p>
*/
public Codec getCodec() {
return codecService.codec(codecName);
}
/**
* Returns <code>true</code> iff documents with auto-generated IDs are optimized if possible. This mainly means that
* they are simply appended to the index if no update call is necessary.
*/
public boolean isOptimizeAutoGenerateId() {
return optimizeAutoGenerateId;
}
/**
* Returns a thread-pool mainly used to get estimated time stamps from {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule
* async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#OPTIMIZE} thread-pool
*/
public ThreadPool getThreadPool() {
return threadPool;
}
/**
* Returns a {@link org.elasticsearch.index.indexing.ShardIndexingService} used inside the engine to inform about
* pre and post index and create operations. The operations are used for statistic purposes etc.
*
* @see org.elasticsearch.index.indexing.ShardIndexingService#postCreate(org.elasticsearch.index.engine.Engine.Create)
* @see org.elasticsearch.index.indexing.ShardIndexingService#preCreate(org.elasticsearch.index.engine.Engine.Create)
*
*/
public ShardIndexingService getIndexingService() {
return indexingService;
}
/**
* Returns an {@link org.elasticsearch.index.settings.IndexSettingsService} used to register a {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} instance
* in order to get notification for realtime changeable settings exposed in this {@link org.elasticsearch.index.engine.EngineConfig}.
*/
public IndexSettingsService getIndexSettingsService() {
return indexSettingsService;
}
/**
* Returns an {@link org.elasticsearch.indices.IndicesWarmer} used to warm new searchers before they are used for searching.
* Note: This method might retrun <code>null</code>
*/
@Nullable
public IndicesWarmer getWarmer() {
return warmer;
}
/**
* Returns the {@link org.elasticsearch.index.store.Store} instance that provides access to the {@link org.apache.lucene.store.Directory}
* used for the engines {@link org.apache.lucene.index.IndexWriter} to write it's index files to.
* <p>
* Note: In order to use this instance the consumer needs to increment the stores reference before it's used the first time and hold
* it's reference until it's not needed anymore.
* </p>
*/
public Store getStore() {
return store;
}
/**
* Returns a {@link org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy} used in the engines
* {@link org.apache.lucene.index.IndexWriter}.
*/
public SnapshotDeletionPolicy getDeletionPolicy() {
return deletionPolicy;
}
/**
* Returns a {@link Translog instance}
*/
public Translog getTranslog() {
return translog;
}
/**
* Returns the {@link org.elasticsearch.index.merge.policy.MergePolicyProvider} used to obtain
* a {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
public MergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
}
/**
* Returns the {@link org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider} used to obtain
* a {@link org.apache.lucene.index.MergeScheduler} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
public MergeSchedulerProvider getMergeScheduler() {
return mergeScheduler;
}
/**
* Returns a listener that should be called on engine failure
*/
public Engine.FailedEngineListener getFailedEngineListener() {
return failedEngineListener;
}
/**
* Returns the latest index settings directly from the index settings service.
*/
public Settings getIndexSettings() {
return indexSettingsService.getSettings();
}
/**
* Returns the engines shard ID
*/
public ShardId getShardId() { return shardId; }
/**
* Returns the analyzer as the default analyzer in the engines {@link org.apache.lucene.index.IndexWriter}
*/
public Analyzer getAnalyzer() {
return analyzer;
}
/**
* Returns the {@link org.apache.lucene.search.similarities.Similarity} used for indexing and searching.
*/
public Similarity getSimilarity() {
return similarity;
}
/**
* Basic realtime updateable settings listener that can be used ot receive notification
* if an index setting changed.
*/
public static abstract class EngineSettingsListener implements IndexSettingsService.Listener {
private final ESLogger logger;
private final EngineConfig config;
public EngineSettingsListener(ESLogger logger, EngineConfig config) {
this.logger = logger;
this.config = config;
}
@Override
public final void onRefreshSettings(Settings settings) {
boolean change = false;
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.gcDeletesInMillis = gcDeletesInMillis;
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.compoundOnFlush = compoundOnFlush;
change = true;
}
final boolean failEngineOnCorruption = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption());
if (failEngineOnCorruption != config.isFailEngineOnCorruption()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption(), failEngineOnCorruption);
config.failEngineOnCorruption = failEngineOnCorruption;
change = true;
}
int indexConcurrency = settings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, config.getIndexConcurrency());
if (indexConcurrency != config.getIndexConcurrency()) {
logger.info("updating index.index_concurrency from [{}] to [{}]", config.getIndexConcurrency(), indexConcurrency);
config.setIndexConcurrency(indexConcurrency);
// we have to flush in this case, since it only applies on a new index writer
change = true;
}
final String codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, config.codecName);
if (!codecName.equals(config.codecName)) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_CODEC_SETTING, config.codecName, codecName);
config.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away...
change = true;
}
final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure());
if (failOnMergeFailure != config.isFailOnMergeFailure()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure);
config.failOnMergeFailure = failOnMergeFailure;
change = true;
}
if (change) {
onChange();
}
}
/**
* This method is called if any of the settings that are exposed as realtime updateble settings has changed.
* This method should be overwritten by subclasses to react on settings changes.
*/
protected abstract void onChange();
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
/**
* Simple Engine Factory
*/
public interface EngineFactory {
public Engine newEngine(EngineConfig config);
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.internal.InternalEngineModule;
/**
*
*/
public class EngineModule extends AbstractModule implements SpawnModules {
public static final String ENGINE_TYPE = "index.engine.type";
public static final Class<? extends Module> DEFAULT_ENGINE = InternalEngineModule.class;
private final Settings settings;
public EngineModule(Settings settings) {
this.settings = settings;
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass(ENGINE_TYPE, DEFAULT_ENGINE,
"org.elasticsearch.index.engine.", "EngineModule"), settings));
}
@Override
protected void configure() {
}
}

View File

@ -39,17 +39,15 @@ import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
@ -61,14 +59,11 @@ import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.*;
@ -88,34 +83,22 @@ public class InternalEngine implements Engine {
protected final ESLogger logger;
protected final ShardId shardId;
private volatile boolean failEngineOnCorruption;
private volatile ByteSizeValue indexingBufferSize;
private volatile int indexConcurrency;
private volatile boolean compoundOnFlush;
private volatile long gcDeletesInMillis;
private final EngineConfig engineConfig;
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final MergeSchedulerListener mergeSchedulerListener;
private final EngineConfig.EngineSettingsListener listener;
/** When we last pruned expired tombstones from versionMap.deletes: */
private volatile long lastDeleteVersionPruneTimeMSec;
private volatile boolean enableGcDeletes = true;
private volatile String codecName;
private final boolean optimizeAutoGenerateId;
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
@Nullable
private final InternalIndicesWarmer warmer;
private final IndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final AnalysisService analysisService;
private final SimilarityService similarityService;
private final CodecService codecService;
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final InternalLock readLock = new InternalLock(rwl.readLock());
@ -127,13 +110,9 @@ public class InternalEngine implements Engine {
private volatile SearcherManager searcherManager;
private volatile boolean closed = false;
private volatile Closeable storeReference;
// flag indicating if a dirty operation has occurred since the last refresh
private volatile boolean dirty = false;
private volatile boolean possibleMergeNeeded = false;
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
@ -163,64 +142,60 @@ public class InternalEngine implements Engine {
private final IndexThrottle throttle;
public InternalEngine(ShardId shardId, ESLogger logger, CodecService codecService, ThreadPool threadPool,
ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService,
boolean enableGcDeletes, long gcDeletesInMillis, ByteSizeValue indexingBufferSize, String codecName,
boolean compoundOnFlush, int indexConcurrency, boolean optimizeAutoGenerateId, boolean failEngineOnCorruption,
FailedEngineListener failedEngineListener) throws EngineException {
Preconditions.checkNotNull(store, "Store must be provided to the engine");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
public InternalEngine(EngineConfig engineConfig) throws EngineException {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");
this.shardId = shardId;
this.logger = logger;
this.gcDeletesInMillis = gcDeletesInMillis;
this.enableGcDeletes = enableGcDeletes;
this.indexingBufferSize = indexingBufferSize;
this.codecName = codecName;
this.threadPool = threadPool;
this.lastDeleteVersionPruneTimeMSec = threadPool.estimatedTimeInMillis();
this.indexingService = indexingService;
this.warmer = (InternalIndicesWarmer) warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.translog = translog;
this.mergePolicyProvider = mergePolicyProvider;
this.mergeScheduler = mergeScheduler;
this.analysisService = analysisService;
this.similarityService = similarityService;
this.codecService = codecService;
this.compoundOnFlush = compoundOnFlush;
this.indexConcurrency = indexConcurrency;
this.shardId = engineConfig.getShardId();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId);
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
this.store = engineConfig.getStore();
this.deletionPolicy = engineConfig.getDeletionPolicy();
this.translog = engineConfig.getTranslog();
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
this.mergeScheduler = engineConfig.getMergeScheduler();
this.versionMap = new LiveVersionMap();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
}
this.optimizeAutoGenerateId = optimizeAutoGenerateId;
this.failEngineOnCorruption = failEngineOnCorruption;
this.failedEngineListener = failedEngineListener;
this.mergeSchedulerFailureListener = new FailEngineOnMergeFailure();
this.mergeSchedulerListener = new MergeSchedulerListener();
this.mergeScheduler.addListener(mergeSchedulerListener);
this.mergeScheduler.addFailureListener(mergeSchedulerFailureListener);
this.failedEngineListener = engineConfig.getFailedEngineListener();
throttle = new IndexThrottle();
this.engineConfig = engineConfig;
listener = new EngineConfig.EngineSettingsListener(logger, engineConfig) {
@Override
protected void onChange() {
updateSettings();
}
};
engineConfig.getIndexSettingsService().addListener(listener);
final IndexWriter writer = start();
assert indexWriter == null : "IndexWriter already initialized";
indexWriter = writer;
}
@Override
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
ByteSizeValue preValue = this.indexingBufferSize;
ByteSizeValue preValue = engineConfig.getIndexingBufferSize();
try (InternalLock _ = readLock.acquire()) {
this.indexingBufferSize = indexingBufferSize;
IndexWriter indexWriter = this.indexWriter;
if (indexWriter != null) {
indexWriter.getConfig().setRAMBufferSizeMB(this.indexingBufferSize.mbFrac());
}
ensureOpen();
engineConfig.setIndexingBufferSize(indexingBufferSize);
indexWriter.getConfig().setRAMBufferSizeMB(indexingBufferSize.mbFrac());
}
if (preValue.bytes() != indexingBufferSize.bytes()) {
// its inactive, make sure we do a full flush in this case, since the memory
// changes only after a "data" change has happened to the writer
if (indexingBufferSize == Engine.INACTIVE_SHARD_INDEXING_BUFFER && preValue != Engine.INACTIVE_SHARD_INDEXING_BUFFER) {
if (indexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER && preValue != EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, indexingBufferSize);
try {
flush(FlushType.COMMIT, false, false);
@ -237,50 +212,20 @@ public class InternalEngine implements Engine {
}
}
@Override
public void addFailedEngineListener(FailedEngineListener listener) {
throw new UnsupportedOperationException("addFailedEngineListener is not supported by InternalEngine. Use InternalEngineHolder.");
}
@Override
public void start() throws EngineException {
private IndexWriter start() throws EngineException {
store.incRef();
/*
* This might look weird but it's in-fact needed since if we close
* the engine due to a corruption on IW startup the reference is decremented in the close
* method and this must not happen more than once
*/
final Closeable storeRef = new Closeable() {
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
store.decRef();
}
}
};
final List<Closeable> closeOnFailure = new ArrayList<>(Arrays.asList(storeRef));
try (InternalLock _ = writeLock.acquire()) {
IndexWriter indexWriter = this.indexWriter;
if (indexWriter != null) {
throw new EngineAlreadyStartedException(shardId);
}
if (closed) {
throw new EngineClosedException(shardId);
}
storeReference = storeRef;
if (logger.isDebugEnabled()) {
logger.debug("starting engine");
}
boolean success = false;
IndexWriter indexWriter = null;
SearcherManager searcherManager = null;
try {
try {
indexWriter = createWriter();
closeOnFailure.add(indexWriter);
} catch (IOException e) {
maybeFailEngine(e, "start");
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
try {
assert indexWriter != null;
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final long translogId = Math.max(0, translog.findLargestPresentTranslogId()) + 1;
@ -293,96 +238,46 @@ public class InternalEngine implements Engine {
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
}
searcherManager = buildSearchManager(indexWriter);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translog.newTranslog(translogId);
final SearcherManager searcherManager = buildSearchManager(indexWriter);
closeOnFailure.add(searcherManager);
versionMap.setManager(searcherManager);
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
this.searcherManager = searcherManager;
translogIdGenerator.set(translogId);
this.indexWriter = indexWriter;
closeOnFailure.clear(); // all is well
success = true;
return indexWriter;
} catch (IOException e) {
maybeFailEngine(e, "start");
try {
if (indexWriter != null) {
indexWriter.rollback();
}
indexWriter.rollback();
} catch (IOException e1) { // iw is closed below
e.addSuppressed(e1);
}
throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
}
} finally {
if (closeOnFailure.isEmpty() == false) { // release everything we created on a failure
IOUtils.closeWhileHandlingException(closeOnFailure);
if (success == false) { // release everything we created on a failure
store.decRef();
IOUtils.closeWhileHandlingException(indexWriter, searcherManager);
}
}
}
private void updateSettings() {
if (closed == false) {
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
final boolean concurrencyNeedsUpdate = iwc.getMaxThreadStates() != engineConfig.getIndexConcurrency();
final boolean codecNeedsUpdate = iwc.getCodec().equals(engineConfig.getCodec()) == false;
if (codecNeedsUpdate || concurrencyNeedsUpdate) {
flush(FlushType.NEW_WRITER, false, false);
}
}
}
@Override
public void stop() throws EngineException {
throw new UnsupportedOperationException("stop() is not supported by InternalEngine. Use InternalEngineHolder.");
}
@Override
public ShardId shardId() {
return shardId;
}
@Override
public TimeValue defaultRefreshInterval() {
return InternalEngineHolder.DEFAULT_REFRESH_INTERVAL;
}
/** return the current indexing buffer size setting * */
public ByteSizeValue indexingBufferSize() {
return indexingBufferSize;
}
@Override
public void enableGcDeletes(boolean enableGcDeletes) {
this.enableGcDeletes = enableGcDeletes;
}
public void updateSettings(final long gcDeletesInMillis, final boolean compoundOnFlush, boolean failEngineOnCorruption, final int indexConcurrency, final String codecName) {
ensureOpen();
if (this.gcDeletesInMillis != gcDeletesInMillis) {
logger.trace("[impl] updating gcDeletesInMillis from [{}] to [{}]", this.gcDeletesInMillis, gcDeletesInMillis);
this.gcDeletesInMillis = gcDeletesInMillis;
}
if (this.compoundOnFlush != compoundOnFlush) {
this.compoundOnFlush = compoundOnFlush;
logger.trace("[impl] updating compoundOnFlush from [{}] to [{}]", this.compoundOnFlush, compoundOnFlush);
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
}
if (this.failEngineOnCorruption != failEngineOnCorruption) {
logger.trace("[impl] updating failEngineOnCorruption from [{}] to [{}]", this.failEngineOnCorruption, failEngineOnCorruption);
this.failEngineOnCorruption = failEngineOnCorruption;
}
if (indexConcurrency != this.indexConcurrency || !codecName.equals(this.codecName)) {
boolean requiresFlushing = false;
try (InternalLock _ = readLock.acquire()) {
if (indexConcurrency != this.indexConcurrency) {
logger.trace("[impl] updating indexConcurrency from [{}] to [{}]", this.indexConcurrency, indexConcurrency);
this.indexConcurrency = indexConcurrency;
// we have to flush in this case, since it only applies on a new index writer
requiresFlushing = true;
}
if (!codecName.equals(this.codecName)) {
logger.trace("[impl] updating codecName from [{}] to [{}]", this.codecName, codecName);
this.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away...
requiresFlushing = true;
}
} finally {
if (requiresFlushing) {
flush(FlushType.NEW_WRITER, false, false);
}
}
}
return engineConfig.getIndexingBufferSize();
}
@Override
@ -416,7 +311,7 @@ public class InternalEngine implements Engine {
} catch (Throwable e) {
Releasables.closeWhileHandlingException(searcher);
//TODO: A better exception goes here
throw new EngineException(shardId(), "Couldn't resolve version", e);
throw new EngineException(shardId, "Couldn't resolve version", e);
}
if (docIdAndVersion != null) {
@ -446,7 +341,6 @@ public class InternalEngine implements Engine {
innerCreate(create, writer);
}
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine(t, "create");
@ -457,7 +351,7 @@ public class InternalEngine implements Engine {
private void innerCreate(Create create, IndexWriter writer) throws IOException {
if (optimizeAutoGenerateId && create.autoGeneratedId() && !create.canHaveDuplicates()) {
if (engineConfig.isOptimizeAutoGenerateId() && create.autoGeneratedId() && !create.canHaveDuplicates()) {
// We don't need to lock because this ID cannot be concurrently updated:
innerCreateNoLock(create, writer, Versions.NOT_FOUND, null);
} else {
@ -468,7 +362,7 @@ public class InternalEngine implements Engine {
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(create.uid());
} else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
@ -551,7 +445,6 @@ public class InternalEngine implements Engine {
innerIndex(index, writer);
}
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine(t, "index");
@ -565,14 +458,14 @@ public class InternalEngine implements Engine {
*/
private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh() > 0.25 * indexingBufferSize.bytes() && versionMapRefreshPending.getAndSet(true) == false) {
if (versionMap.ramBytesUsedForRefresh() > 0.25 * engineConfig.getIndexingBufferSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (closed) {
// no point...
return;
}
// Now refresh to clear versionMap:
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void run() {
try {
refresh("version_table_full", false);
@ -594,7 +487,7 @@ public class InternalEngine implements Engine {
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
@ -642,13 +535,9 @@ public class InternalEngine implements Engine {
@Override
public void delete(Delete delete) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
IndexWriter writer = this.indexWriter;
if (writer == null) {
throw new EngineClosedException(shardId, failedEngine);
}
innerDelete(delete, writer);
final IndexWriter indexWriter = currentIndexWriter();
innerDelete(delete, indexWriter);
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine(t, "delete");
@ -661,7 +550,7 @@ public class InternalEngine implements Engine {
private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (enableGcDeletes && threadPool.estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > gcDeletesInMillis * 0.25) {
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > engineConfig.getGcDeletesInMillis() * 0.25) {
pruneDeletedTombstones();
}
}
@ -673,7 +562,7 @@ public class InternalEngine implements Engine {
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid());
} else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
@ -705,7 +594,7 @@ public class InternalEngine implements Engine {
delete.updateVersion(updatedVersion, found);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation));
indexingService.postDeleteUnderLock(delete);
}
@ -714,11 +603,7 @@ public class InternalEngine implements Engine {
@Override
public void delete(DeleteByQuery delete) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
IndexWriter writer = this.indexWriter;
if (writer == null) {
throw new EngineClosedException(shardId);
}
final IndexWriter indexWriter = currentIndexWriter();
Query query;
if (delete.nested() && delete.aliasFilter() != null) {
query = new IncludeNestedDocsQuery(new FilteredQuery(delete.query(), delete.aliasFilter()), delete.parentFilter());
@ -730,10 +615,9 @@ public class InternalEngine implements Engine {
query = delete.query();
}
writer.deleteDocuments(query);
indexWriter.deleteDocuments(query);
translog.add(new Translog.DeleteByQuery(delete));
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (Throwable t) {
maybeFailEngine(t, "delete_by_query");
@ -753,18 +637,8 @@ public class InternalEngine implements Engine {
* the searcher is acquired. */
store.incRef();
try {
SearcherManager manager = this.searcherManager;
if (manager == null) {
ensureOpen();
try (InternalLock _ = this.readLock.acquire()) {
// we might start up right now and the searcherManager is not initialized
// we take the read lock and retry again since write lock is taken
// while start() is called and otherwise the ensureOpen() call will
// barf.
manager = this.searcherManager;
assert manager != null : "SearcherManager is null but shouldn't";
}
}
final SearcherManager manager = this.searcherManager; // can never be null
assert manager != null : "SearcherManager is null";
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
@ -818,21 +692,9 @@ public class InternalEngine implements Engine {
return false;
}
private boolean possibleMergeNeeded() {
IndexWriter writer = this.indexWriter;
if (writer == null) {
return false;
}
// a merge scheduler might bail without going through all its pending merges
// so make sure we also check if there are pending merges
return this.possibleMergeNeeded || writer.hasPendingMerges();
}
@Override
public void refresh(String source, boolean force) throws EngineException {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
ensureOpen();
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (InternalLock _ = readLock.acquire()) {
@ -957,7 +819,7 @@ public class InternalEngine implements Engine {
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
@ -981,7 +843,7 @@ public class InternalEngine implements Engine {
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
@ -1011,7 +873,7 @@ public class InternalEngine implements Engine {
}
private void ensureOpen() {
if (indexWriter == null) {
if (closed) {
throw new EngineClosedException(shardId, failedEngine);
}
}
@ -1024,13 +886,14 @@ public class InternalEngine implements Engine {
private IndexWriter currentIndexWriter() {
final IndexWriter writer = indexWriter;
if (writer == null) {
assert closed : "Engine is not closed but writer is null";
throw new EngineClosedException(shardId, failedEngine);
}
return writer;
}
private void pruneDeletedTombstones() {
long timeMSec = threadPool.estimatedTimeInMillis();
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...
@ -1039,10 +902,10 @@ public class InternalEngine implements Engine {
BytesRef uid = entry.getKey();
synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time() > gcDeletesInMillis) {
if (timeMSec - versionValue.time() > engineConfig.getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
@ -1052,19 +915,6 @@ public class InternalEngine implements Engine {
lastDeleteVersionPruneTimeMSec = timeMSec;
}
private void maybeMerge() throws EngineException {
if (!possibleMergeNeeded()) {
return;
}
possibleMergeNeeded = false;
try (InternalLock _ = readLock.acquire()) {
currentIndexWriter().maybeMerge();
} catch (Throwable t) {
maybeFailEngine(t, "maybe_merge");
throw new OptimizeFailedEngineException(shardId, t);
}
}
// TODO: can we please remove this method?!
private void waitForMerges(boolean flushAfter) {
try {
@ -1107,7 +957,6 @@ public class InternalEngine implements Engine {
writer.forceMergeDeletes(false);
} else if (maxNumSegments <= 0) {
writer.maybeMerge();
possibleMergeNeeded = false;
} else {
writer.forceMerge(maxNumSegments, false);
}
@ -1124,7 +973,7 @@ public class InternalEngine implements Engine {
waitForMerges(flush);
} else if (flush) {
// we only need to monitor merges for async calls if we are going to flush
threadPool.executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
engineConfig.getThreadPool().executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("Exception while waiting for merges asynchronously after optimize", t);
@ -1214,11 +1063,11 @@ public class InternalEngine implements Engine {
private boolean maybeFailEngine(Throwable t, String source) {
if (Lucene.isCorruptionException(t)) {
if (this.failEngineOnCorruption) {
if (engineConfig.isFailEngineOnCorruption()) {
failEngine("corrupt file detected source: [" + source + "]", t);
return true;
} else {
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption());
}
} else if (ExceptionsHelper.isOOM(t)) {
failEngine("out of memory", t);
@ -1233,7 +1082,7 @@ public class InternalEngine implements Engine {
}
return t;
}
private long guardedRamBytesUsed(Accountable a) {
if (a == null) {
return 0;
@ -1379,8 +1228,11 @@ public class InternalEngine implements Engine {
} catch (Throwable e) {
logger.warn("failed to rollback writer on close", e);
} finally {
store.decRef();
indexWriter = null;
IOUtils.closeWhileHandlingException(storeReference);
this.mergeScheduler.removeListener(mergeSchedulerListener);
this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
engineConfig.getIndexSettingsService().removeListener(listener);
}
}
}
@ -1419,7 +1271,10 @@ public class InternalEngine implements Engine {
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
} finally {
close();
if (indexWriter != null) {
// we might be not yet be fully constructed - don't call close
close();
}
}
}
} else {
@ -1456,31 +1311,31 @@ public class InternalEngine implements Engine {
private IndexWriter createWriter() throws IOException {
try {
boolean create = !Lucene.indexExists(store.directory());
IndexWriterConfig config = new IndexWriterConfig(analysisService.defaultIndexAnalyzer());
config.setCommitOnClose(false); // we by default don't commit on close
config.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
config.setIndexDeletionPolicy(deletionPolicy);
config.setInfoStream(new LoggerInfoStream(logger));
config.setMergeScheduler(mergeScheduler.newMergeScheduler());
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
iwc.setInfoStream(new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler.newMergeScheduler());
MergePolicy mergePolicy = mergePolicyProvider.getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
config.setMergePolicy(mergePolicy);
config.setSimilarity(similarityService.similarity());
config.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
config.setMaxThreadStates(indexConcurrency);
config.setCodec(codecService.codec(codecName));
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().mbFrac());
iwc.setMaxThreadStates(engineConfig.getIndexConcurrency());
iwc.setCodec(engineConfig.getCodec());
/* We set this timeout to a highish value to work around
* the default poll interval in the Lucene lock that is
* 1000ms by default. We might need to poll multiple times
* here but with 1s poll this is only executed twice at most
* in combination with the default writelock timeout*/
config.setWriteLockTimeout(5000);
config.setUseCompoundFile(this.compoundOnFlush);
iwc.setWriteLockTimeout(5000);
iwc.setUseCompoundFile(this.engineConfig.isCompoundOnFlush());
// Warm-up hook for newly-merged segments. Warming up segments here is better since it will be performed at the end
// of the merge operation and won't slow down _refresh
config.setMergedSegmentWarmer(new IndexReaderWarmer() {
iwc.setMergedSegmentWarmer(new IndexReaderWarmer() {
@Override
public void warm(LeafReader reader) throws IOException {
try {
@ -1502,7 +1357,7 @@ public class InternalEngine implements Engine {
}
}
});
return new IndexWriter(store.directory(), config);
return new IndexWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
boolean isLocked = IndexWriter.isLocked(store.directory());
logger.warn("Could not lock IndexWriter isLocked [{}]", ex, isLocked);
@ -1568,18 +1423,19 @@ public class InternalEngine implements Engine {
class SearchFactory extends SearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader reader) throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
searcher.setSimilarity(similarityService.similarity());
searcher.setSimilarity(engineConfig.getSimilarity());
if (warmer != null) {
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
IndexSearcher newSearcher = null;
boolean closeNewSearcher = false;
try {
if (searcherManager == null) {
// fresh index writer, just do on all of it
if (indexWriter == null) {
// we are starting up - no writer active so we can't acquire a searcher.
newSearcher = searcher;
} else {
try (final Searcher currentSearcher = acquireSearcher("search_factory")) {
@ -1778,27 +1634,59 @@ public class InternalEngine implements Engine {
}
long getGcDeletesInMillis() {
return gcDeletesInMillis;
}
String getCodecName() {
return codecName;
}
boolean isCompoundOnFlush() {
return compoundOnFlush;
}
int getIndexConcurrency() {
return indexConcurrency;
}
boolean isFailEngineOnCorruption() {
return failEngineOnCorruption;
return engineConfig.getGcDeletesInMillis();
}
LiveIndexWriterConfig getCurrentIndexWriterConfig() {
IndexWriter writer = currentIndexWriter();
return writer == null ? null : writer.getConfig();
return currentIndexWriter().getConfig();
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@Override
public void onFailedMerge(MergePolicy.MergeException e) {
if (Lucene.isCorruptionException(e)) {
if (engineConfig.isFailEngineOnCorruption()) {
failEngine("corrupt file detected source: [merge]", e);
} else {
logger.warn("corrupt file detected source: [merge] but [{}] is set to [{}]", e, EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption());
}
} else if (engineConfig.isFailOnMergeFailure()) {
failEngine("merge exception", e);
}
}
}
class MergeSchedulerListener implements MergeSchedulerProvider.Listener {
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
@Override
public synchronized void beforeMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
indexingService.throttlingActivated();
activateThrottling();
}
}
}
@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
indexingService.throttlingDeactivated();
deactivateThrottling();
}
}
}
}
EngineConfig config() {
return engineConfig;
}
}

View File

@ -16,19 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
/**
*
*/
public class InternalEngineModule extends AbstractModule {
public class InternalEngineFactory implements EngineFactory {
@Override
protected void configure() {
bind(Engine.class).to(InternalEngineHolder.class).asEagerSingleton();
public Engine newEngine(EngineConfig config) {
return new InternalEngine(config);
}
}

View File

@ -1,470 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class InternalEngineHolder extends AbstractIndexShardComponent implements IndexShardComponent, Engine, Engine.FailedEngineListener {
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final ApplySettings settingsListener;
private final MergeScheduleListener mergeSchedulerListener;
protected volatile Boolean failOnMergeFailure;
protected volatile boolean failEngineOnCorruption;
protected volatile ByteSizeValue indexingBufferSize;
protected volatile int indexConcurrency;
protected volatile boolean compoundOnFlush = true;
protected long gcDeletesInMillis;
protected volatile boolean enableGcDeletes = true;
protected volatile String codecName;
protected final boolean optimizeAutoGenerateId;
protected final ThreadPool threadPool;
protected final ShardIndexingService indexingService;
protected final IndexSettingsService indexSettingsService;
@Nullable
protected final InternalIndicesWarmer warmer;
protected final Store store;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final Translog translog;
protected final MergePolicyProvider mergePolicyProvider;
protected final MergeSchedulerProvider mergeScheduler;
protected final AnalysisService analysisService;
protected final SimilarityService similarityService;
protected final CodecService codecService;
private final AtomicReference<InternalEngine> currentEngine = new AtomicReference<>();
private volatile boolean closed = false;
public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
public static final String INDEX_GC_DELETES = "index.gc_deletes";
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
public static final String INDEX_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();
@Inject
public InternalEngineHolder(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService, CodecService codecService) throws EngineException {
super(shardId, indexSettings);
Preconditions.checkNotNull(store, "Store must be provided to the engine");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
this.gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueSeconds(60)).millis();
this.indexingBufferSize = componentSettings.getAsBytesSize("index_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB)); // not really important, as it is set by the IndexingMemory manager
this.codecName = indexSettings.get(INDEX_CODEC, "default");
this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.indexingService = indexingService;
this.warmer = (InternalIndicesWarmer) warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.translog = translog;
this.mergePolicyProvider = mergePolicyProvider;
this.mergeScheduler = mergeScheduler;
this.analysisService = analysisService;
this.similarityService = similarityService;
this.codecService = codecService;
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
this.failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, true);
this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
this.mergeSchedulerFailureListener = new FailEngineOnMergeFailure();
this.mergeScheduler.addFailureListener(mergeSchedulerFailureListener);
this.mergeSchedulerListener = new MergeScheduleListener();
this.mergeScheduler.addListener(mergeSchedulerListener);
this.settingsListener = new ApplySettings(logger, this);
this.indexSettingsService.addListener(this.settingsListener);
store.incRef();
}
@Override
public TimeValue defaultRefreshInterval() {
return DEFAULT_REFRESH_INTERVAL;
}
public InternalEngine engineSafe() {
InternalEngine engine = currentEngine.get();
if (engine == null) {
throw new EngineClosedException(shardId);
}
return engine;
}
@Override
public void enableGcDeletes(boolean enableGcDeletes) {
this.enableGcDeletes = enableGcDeletes;
InternalEngine currentEngine = this.currentEngine.get();
if (currentEngine != null) {
currentEngine.enableGcDeletes(enableGcDeletes);
}
}
@Override
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
InternalEngine currentEngine = this.currentEngine.get();
if (currentEngine != null) {
currentEngine.updateIndexingBufferSize(indexingBufferSize);
}
}
@Override
public void addFailedEngineListener(FailedEngineListener listener) {
failedEngineListeners.add(listener);
}
@Override
public synchronized void start() throws EngineException {
if (closed) {
throw new EngineClosedException(shardId);
}
InternalEngine currentEngine = this.currentEngine.get();
if (currentEngine != null) {
throw new EngineAlreadyStartedException(shardId);
}
InternalEngine newEngine = createEngine();
store.incRef();
try {
newEngine.start();
boolean success = this.currentEngine.compareAndSet(null, newEngine);
assert success : "engine changes should be done under a synchronize";
} finally {
store.decRef();
}
}
@Override
public synchronized void stop() throws EngineException {
InternalEngine currentEngine = this.currentEngine.getAndSet(null);
if (currentEngine != null) {
currentEngine.close();
}
}
@Override
public synchronized void close() throws ElasticsearchException {
if (closed == false) {
closed = true;
try {
InternalEngine currentEngine = this.currentEngine.getAndSet(null);
if (currentEngine != null) {
currentEngine.close();
}
mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
mergeScheduler.removeListener(mergeSchedulerListener);
indexSettingsService.removeListener(settingsListener);
} finally {
store.decRef();
}
}
}
protected InternalEngine createEngine() {
return new InternalEngine(shardId, logger, codecService, threadPool, indexingService,
warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService,
enableGcDeletes, gcDeletesInMillis,
indexingBufferSize, codecName, compoundOnFlush, indexConcurrency, optimizeAutoGenerateId, failEngineOnCorruption, this);
}
@Override
public void create(Create create) throws EngineException {
engineSafe().create(create);
}
@Override
public void index(Index index) throws EngineException {
engineSafe().index(index);
}
@Override
public void delete(Delete delete) throws EngineException {
engineSafe().delete(delete);
}
@Override
public void delete(DeleteByQuery delete) throws EngineException {
engineSafe().delete(delete);
}
@Override
public GetResult get(Get get) throws EngineException {
return engineSafe().get(get);
}
@Override
public Searcher acquireSearcher(String source) throws EngineException {
return engineSafe().acquireSearcher(source);
}
@Override
public SegmentsStats segmentsStats() {
return engineSafe().segmentsStats();
}
@Override
public List<Segment> segments(boolean verbose) {
return engineSafe().segments(verbose);
}
@Override
public boolean refreshNeeded() {
return engineSafe().refreshNeeded();
}
@Override
public void refresh(String source, boolean force) throws EngineException {
engineSafe().refresh(source, force);
}
@Override
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException, FlushNotAllowedEngineException {
engineSafe().flush(type, force, waitIfOngoing);
}
@Override
public void forceMerge(boolean flush, boolean waitForMerge) {
engineSafe().forceMerge(flush, waitForMerge);
}
@Override
public void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
engineSafe().forceMerge(flush, waitForMerge, maxNumSegments, onlyExpungeDeletes, upgrade);
}
@Override
public SnapshotIndexCommit snapshotIndex() throws EngineException {
return engineSafe().snapshotIndex();
}
@Override
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
engineSafe().recover(recoveryHandler);
}
@Override
public void failEngine(String reason, Throwable failure) {
engineSafe().failEngine(reason, failure);
}
@Override
public ShardId shardId() {
return shardId;
}
@Override
public Settings indexSettings() {
return indexSettings;
}
/** return the current indexing buffer size setting * */
public ByteSizeValue indexingBufferSize() {
return indexingBufferSize;
}
// called by the current engine
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
try {
for (FailedEngineListener listener : failedEngineListeners) {
try {
listener.onFailedEngine(shardId, reason, failure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
}
} finally {
close(); // we need to close ourself - we failed all bets are off
}
}
static class ApplySettings implements IndexSettingsService.Listener {
private final ESLogger logger;
private final InternalEngineHolder holder;
ApplySettings(ESLogger logger, InternalEngineHolder holder) {
this.logger = logger;
this.holder = holder;
}
@Override
public void onRefreshSettings(Settings settings) {
boolean change = false;
long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(holder.gcDeletesInMillis)).millis();
if (gcDeletesInMillis != holder.gcDeletesInMillis) {
logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(holder.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
holder.gcDeletesInMillis = gcDeletesInMillis;
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush);
if (compoundOnFlush != holder.compoundOnFlush) {
logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush, compoundOnFlush);
holder.compoundOnFlush = compoundOnFlush;
change = true;
}
final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption);
if (failEngineOnCorruption != holder.failEngineOnCorruption) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption, failEngineOnCorruption);
holder.failEngineOnCorruption = failEngineOnCorruption;
change = true;
}
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, holder.indexConcurrency);
if (indexConcurrency != holder.indexConcurrency) {
logger.info("updating index.index_concurrency from [{}] to [{}]", holder.indexConcurrency, indexConcurrency);
holder.indexConcurrency = indexConcurrency;
// we have to flush in this case, since it only applies on a new index writer
change = true;
}
final String codecName = settings.get(INDEX_CODEC, holder.codecName);
if (!codecName.equals(holder.codecName)) {
logger.info("updating index.codec from [{}] to [{}]", holder.codecName, codecName);
holder.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away...
change = true;
}
final boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure);
if (failOnMergeFailure != holder.failOnMergeFailure) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure, failOnMergeFailure);
holder.failOnMergeFailure = failOnMergeFailure;
}
if (change) {
holder.updateSettings();
}
}
}
synchronized void updateSettings() {
// we need to make sure that we wait for the engine to be fully initialized
// the start method sets the current engine once it's done but samples the settings
// at construction time.
final InternalEngine engine = currentEngine.get();
if (engine != null) {
engine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName);
}
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@Override
public void onFailedMerge(MergePolicy.MergeException e) {
if (Lucene.isCorruptionException(e)) {
if (failEngineOnCorruption) {
failEngine("corrupt file detected source: [merge]", e);
} else {
logger.warn("corrupt file detected source: [merge] but [{}] is set to [{}]", e, INDEX_FAIL_ON_CORRUPTION, failEngineOnCorruption);
}
} else if (failOnMergeFailure) {
failEngine("merge exception", e);
}
}
}
class MergeScheduleListener implements MergeSchedulerProvider.Listener {
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
@Override
public synchronized void beforeMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
InternalEngine currentEngineImpl = currentEngine.get();
if (numMergesInFlight.incrementAndGet() > maxNumMerges && currentEngineImpl != null) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
indexingService.throttlingActivated();
currentEngineImpl.activateThrottling();
}
}
}
@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
InternalEngine currentEngineImpl = currentEngine.get();
if (numMergesInFlight.decrementAndGet() < maxNumMerges && currentEngineImpl != null) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
indexingService.throttlingDeactivated();
currentEngineImpl.deactivateThrottling();
}
}
}
}
}

View File

@ -28,7 +28,7 @@ import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.engine.internal.InternalEngineHolder;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
@ -41,7 +41,7 @@ import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer;
/**
*/
@ -82,12 +82,12 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_GC_DELETES, Validator.TIME);
indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_CODEC);
indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE);
indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CONCURRENCY_SETTING, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CODEC_SETTING);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
@ -118,7 +118,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME);
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH);
indexDynamicSettings.addDynamicSetting(InternalIndicesWarmer.INDEX_WARMER_ENABLED);
indexDynamicSettings.addDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED);
indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN);
}

View File

@ -28,6 +28,7 @@ import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
@ -38,6 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -46,10 +48,12 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
@ -57,13 +61,9 @@ import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
@ -79,6 +79,7 @@ import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
@ -90,6 +91,7 @@ import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
@ -102,6 +104,7 @@ import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@ -109,8 +112,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.mapper.SourceToParse.source;
@ -127,7 +132,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
private final InternalIndicesLifecycle indicesLifecycle;
private final Store store;
private final MergeSchedulerProvider mergeScheduler;
private final Engine engine;
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
private final Translog translog;
private final IndexAliasesService indexAliasesService;
private final ShardIndexingService indexingService;
@ -148,6 +153,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
private final Object mutex = new Object();
private final String checkIndexOnStartup;
private final EngineConfig config;
private final EngineFactory engineFactory;
private long checkIndexTook = 0;
private volatile IndexShardState state;
@ -165,16 +172,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
@Inject
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache) {
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, AnalysisService analysisService, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
super(shardId, indexSettings);
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
Preconditions.checkNotNull(translog, "Translog must be provided to the index shard");
this.engineFactory = factory;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.engine = engine;
this.mergeScheduler = mergeScheduler;
this.translog = translog;
this.threadPool = threadPool;
@ -198,26 +211,28 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
this.shardSuggestService = shardSuggestService;
this.shardBitsetFilterCache = shardBitsetFilterCache;
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval());
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
indexSettingsService.addListener(applyRefreshSettings);
/* create engine config */
this.config = new EngineConfig(shardId,
indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, true),
threadPool,indexingService,indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
analysisService.defaultIndexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener);
logger.debug("state: [CREATED]");
this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");
}
public MergeSchedulerProvider mergeScheduler() {
return this.mergeScheduler;
}
public Store store() {
return this.store;
}
public Engine engine() {
return engine;
return engineSafe();
}
public Translog translog() {
@ -305,7 +320,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
engine.refresh("cluster_state_started", true);
engineSafe().refresh("cluster_state_started", true);
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
@ -400,7 +415,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", create.type(), create.id(), create.docs());
}
engine.create(create);
engineSafe().create(create);
create.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postCreate(create, ex);
@ -424,7 +439,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
engine.index(index);
engineSafe().index(index);
index.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postIndex(index, ex);
@ -447,7 +462,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
engine.delete(delete);
engineSafe().delete(delete);
delete.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postDelete(delete, ex);
@ -475,14 +490,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
logger.trace("delete_by_query [{}]", deleteByQuery.query());
}
deleteByQuery = indexingService.preDeleteByQuery(deleteByQuery);
engine.delete(deleteByQuery);
engineSafe().delete(deleteByQuery);
deleteByQuery.endTime(System.nanoTime());
indexingService.postDeleteByQuery(deleteByQuery);
}
public Engine.GetResult get(Engine.Get get) throws ElasticsearchException {
readAllowed();
return engine.get(get);
return engineSafe().get(get);
}
public void refresh(String source, boolean force) throws ElasticsearchException {
@ -491,7 +506,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
logger.trace("refresh with soruce: {} force: {}", source, force);
}
long time = System.nanoTime();
engine.refresh(source, force);
engineSafe().refresh(source, force);
refreshMetric.inc(System.nanoTime() - time);
}
@ -539,7 +554,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
}
public SegmentsStats segmentStats() {
SegmentsStats segmentsStats = engine.segmentsStats();
SegmentsStats segmentsStats = engineSafe().segmentsStats();
segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
return segmentsStats;
}
@ -601,7 +616,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
logger.trace("flush with {}", request);
}
long time = System.nanoTime();
engine.flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
engineSafe().flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
flushMetric.inc(System.nanoTime() - time);
}
@ -610,7 +625,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
}
engine.forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
engineSafe().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
}
@ -618,7 +633,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return engine.snapshotIndex();
return engineSafe().snapshotIndex();
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
@ -626,12 +641,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException {
verifyStarted();
engine.recover(recoveryHandler);
engineSafe().recover(recoveryHandler);
}
public void failShard(String reason, Throwable e) {
// fail the engine. This will cause this shard to also be removed from the node's index service.
engine.failEngine(reason, e);
engineSafe().failEngine(reason, e);
}
public Engine.Searcher acquireSearcher(String source) {
@ -640,19 +655,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
public Engine.Searcher acquireSearcher(String source, boolean searcherForWriteOperation) {
readAllowed(searcherForWriteOperation);
return engine.acquireSearcher(source);
return engineSafe().acquireSearcher(source);
}
public void close(String reason) {
public void close(String reason) throws IOException {
synchronized (mutex) {
indexSettingsService.removeListener(applyRefreshSettings);
if (state != IndexShardState.CLOSED) {
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
FutureUtils.cancel(mergeScheduleFuture);
mergeScheduleFuture = null;
try {
indexSettingsService.removeListener(applyRefreshSettings);
if (state != IndexShardState.CLOSED) {
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
FutureUtils.cancel(mergeScheduleFuture);
mergeScheduleFuture = null;
}
changeState(IndexShardState.CLOSED, reason);
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
IOUtils.close(engine);
}
changeState(IndexShardState.CLOSED, reason);
}
}
@ -675,7 +695,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
if (Booleans.parseBoolean(checkIndexOnStartup, false)) {
checkIndex(true);
}
engine.start();
createNewEngine();
startScheduledTasksIfNeeded();
changeState(IndexShardState.POST_RECOVERY, reason);
}
@ -696,8 +716,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
}
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
engine.enableGcDeletes(false);
engine.start();
config.setEnableGcDeletes(false);
createNewEngine();
}
/**
@ -714,17 +734,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
if (withFlush) {
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engineSafe().flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
}
// clear unreferenced files
translog.clearUnreferenced();
engine.refresh("recovery_finalization", true);
engineSafe().refresh("recovery_finalization", true);
synchronized (mutex) {
changeState(IndexShardState.POST_RECOVERY, "post recovery");
}
indicesLifecycle.afterIndexShardPostRecovery(this);
startScheduledTasksIfNeeded();
engine.enableGcDeletes(true);
config.setEnableGcDeletes(true);
}
/**
@ -744,7 +764,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
source(create.source()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
engine.create(engineCreate);
engineSafe().create(engineCreate);
indexOperation = engineCreate;
break;
case SAVE:
@ -752,18 +772,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
Engine.Index engineIndex = prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
engine.index(engineIndex);
engineSafe().index(engineIndex);
indexOperation = engineIndex;
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Uid uid = Uid.createUid(delete.uid().text());
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
engineSafe().delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false));
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), Engine.Operation.Origin.RECOVERY, deleteByQuery.types()));
engineSafe().delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), Engine.Operation.Origin.RECOVERY, deleteByQuery.types()));
break;
default:
throw new ElasticsearchIllegalStateException("No operation defined for [" + operation + "]");
@ -874,6 +894,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
this.failedEngineListener.delegates.add(failedEngineListener);
}
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
Engine engine = engineSafe();
engine.updateIndexingBufferSize(shardIndexingBufferSize);
translog().updateBuffer(shardIndexingBufferSize);
}
public void markAsInactive() {
Engine engine = engineSafe();
engine.updateIndexingBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
}
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
@ -912,7 +948,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
@Override
public void run() {
try {
if (engine.refreshNeeded()) {
if (engineSafe().refreshNeeded()) {
refresh("schedule", false);
}
} catch (EngineClosedException e) {
@ -993,4 +1029,43 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
logger.warn("failed to check index", e);
}
}
private Engine engineSafe() {
Engine engine = this.currentEngineReference.get();
if (engine == null) {
throw new EngineClosedException(shardId);
}
return engine;
}
class ShardEngineFailListener implements Engine.FailedEngineListener {
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();
// called by the current engine
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
try {
for (Engine.FailedEngineListener listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
}
} finally {
IOUtils.closeWhileHandlingException(engineSafe()); // we need to close ourself - we failed all bets are off
}
}
}
private void createNewEngine() {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
}
assert this.currentEngineReference.get() == null;
this.currentEngineReference.set(engineFactory.newEngine(config));
}
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.internal.InternalEngineFactory;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
/**
@ -28,9 +30,14 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService;
*/
public class IndexShardModule extends AbstractModule {
private final ShardId shardId;
public static final String ENGINE_FACTORY = "index.engine.factory";
private static final Class<? extends EngineFactory> DEFAULT_ENGINE_FACTORY_CLASS = InternalEngineFactory.class;
public IndexShardModule(ShardId shardId) {
private final ShardId shardId;
private final Settings settings;
public IndexShardModule(ShardId shardId, Settings settings) {
this.settings = settings;
this.shardId = shardId;
}
@ -38,6 +45,9 @@ public class IndexShardModule extends AbstractModule {
protected void configure() {
bind(ShardId.class).toInstance(shardId);
bind(IndexShard.class).asEagerSingleton();
bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS,
"org.elasticsearch.index.engine.", "EngineFactory"));
bind(ShardIndexWarmerService.class).asEagerSingleton();
}
}

View File

@ -40,8 +40,6 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
/**
*
@ -77,7 +75,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesFieldDataCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
bind(IndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();

View File

@ -17,9 +17,10 @@
* under the License.
*/
package org.elasticsearch.indices.warmer;
package org.elasticsearch.indices;
import com.google.common.collect.Lists;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
@ -27,6 +28,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -37,7 +40,7 @@ import java.util.concurrent.TimeUnit;
/**
*/
public class InternalIndicesWarmer extends AbstractComponent implements IndicesWarmer {
public final class IndicesWarmer extends AbstractComponent {
public static final String INDEX_WARMER_ENABLED = "index.warmer.enabled";
@ -50,19 +53,17 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
@Inject
public InternalIndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) {
public IndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
}
@Override
public void addListener(Listener listener) {
listeners.add(listener);
}
@Override
public void removeListener(Listener listener) {
listeners.remove(listener);
}
@ -100,7 +101,7 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
}
indexShard.warmerService().onPreWarm();
long time = System.nanoTime();
final List<IndicesWarmer.Listener.TerminationHandle> terminationHandles = Lists.newArrayList();
final List<TerminationHandle> terminationHandles = Lists.newArrayList();
// get a handle on pending tasks
for (final Listener listener : listeners) {
if (topReader) {
@ -110,7 +111,7 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
}
}
// wait for termination
for (IndicesWarmer.Listener.TerminationHandle terminationHandle : terminationHandles) {
for (TerminationHandle terminationHandle : terminationHandles) {
try {
terminationHandle.awaitTermination();
} catch (InterruptedException e) {
@ -134,4 +135,55 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
}
}
/** A handle on the execution of warm-up action. */
public interface TerminationHandle {
public static TerminationHandle NO_WAIT = new TerminationHandle() {
@Override
public void awaitTermination() {}
};
/** Wait until execution of the warm-up action completes. */
void awaitTermination() throws InterruptedException;
}
public static abstract class Listener {
public String executor() {
return ThreadPool.Names.WARMER;
}
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */
public abstract TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool);
public abstract TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool);
}
public static final class WarmerContext {
private final ShardId shardId;
private final Engine.Searcher searcher;
public WarmerContext(ShardId shardId, Engine.Searcher searcher) {
this.shardId = shardId;
this.searcher = searcher;
}
public ShardId shardId() {
return shardId;
}
/** Return a searcher instance that only wraps the segments to warm. */
public Engine.Searcher searcher() {
return searcher;
}
public IndexReader reader() {
return searcher.reader();
}
@Override
public String toString() {
return "WarmerContext: " + searcher.reader();
}
}
}

View File

@ -682,7 +682,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
IndexShard indexShard = indexService.createShard(shardId);
indexShard.routingEntry(shardRouting);
indexShard.engine().addFailedEngineListener(failedEngineHandler);
indexShard.addFailedEngineListener(failedEngineHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
@ -163,8 +164,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
for (IndexShard indexShard : activeToInactiveIndexingShards) {
// update inactive indexing buffer size
try {
((IndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
((IndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
indexShard.markAsInactive();
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
@ -193,7 +193,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
final long time = threadPool.estimatedTimeInMillis();
Translog translog = ((IndexShard) indexShard).translog();
Translog translog = indexShard.translog();
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null) {
status = new ShardIndexingStatus();
@ -213,7 +213,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
activeToInactiveIndexingShards.add(indexShard);
status.activeIndexing = false;
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
}
}
} else {
@ -301,8 +301,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || status.activeIndexing) {
try {
((IndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
((IndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize);
indexShard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
} catch (EngineClosedException e) {
// ignore
continue;

View File

@ -1,89 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.warmer;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.threadpool.ThreadPool;
/**
*/
public interface IndicesWarmer {
public abstract class Listener {
public String executor() {
return ThreadPool.Names.WARMER;
}
/** A handle on the execution of warm-up action. */
public static interface TerminationHandle {
public static TerminationHandle NO_WAIT = new TerminationHandle() {
@Override
public void awaitTermination() {}
};
/** Wait until execution of the warm-up action completes. */
void awaitTermination() throws InterruptedException;
}
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */
public abstract TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool);
public abstract TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool);
}
public static class WarmerContext {
private final ShardId shardId;
private final Engine.Searcher searcher;
public WarmerContext(ShardId shardId, Engine.Searcher searcher) {
this.shardId = shardId;
this.searcher = searcher;
}
public ShardId shardId() {
return shardId;
}
/** Return a searcher instance that only wraps the segments to warm. */
public Engine.Searcher searcher() {
return searcher;
}
public IndexReader reader() {
return searcher.reader();
}
@Override
public String toString() {
return "WarmerContext: " + searcher.reader();
}
}
void addListener(Listener listener);
void removeListener(Listener listener);
}

View File

@ -67,8 +67,9 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.IndicesWarmer.WarmerContext;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer.WarmerContext;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.dfs.CachedDfSource;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
@ -43,13 +44,13 @@ public class InternalEngineIntegrationTest extends ElasticsearchIntegrationTest
refresh();
assertTotalCompoundSegments(1, 1, "test");
client().admin().indices().prepareUpdateSettings("test")
.setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, false)).get();
.setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false)).get();
client().prepareIndex("test", "foo").setSource("field", "foo").get();
refresh();
assertTotalCompoundSegments(1, 2, "test");
client().admin().indices().prepareUpdateSettings("test")
.setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true)).get();
.setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true)).get();
client().prepareIndex("test", "foo").setSource("field", "foo").get();
refresh();
assertTotalCompoundSegments(2, 3, "test");

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index.engine.internal;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import static org.hamcrest.Matchers.is;
@ -29,11 +30,12 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
public void testLuceneSettings() {
final IndexService service = createIndex("foo");
// INDEX_COMPOUND_ON_FLUSH
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(true));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, false).build()).get();
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(false));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(true));
InternalEngine engine = ((InternalEngine)engine(service));
assertThat(engine.currentIndexWriterConfig().getUseCompoundFile(), is(true));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build()).get();
assertThat(engine.currentIndexWriterConfig().getUseCompoundFile(), is(false));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
assertThat(engine.currentIndexWriterConfig().getUseCompoundFile(), is(true));
}

View File

@ -29,16 +29,14 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.*;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
@ -47,7 +45,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
@ -68,7 +65,6 @@ import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
@ -88,7 +84,6 @@ import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
@ -122,34 +117,25 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void setUp() throws Exception {
super.setUp();
defaultSettings = ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
.put(InternalEngineHolder.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, randomBoolean())
.build(); // TODO randomize more settings
threadPool = new ThreadPool(getClass().getName());
store = createStore();
store.deleteContent();
storeReplica = createStore();
storeReplica.deleteContent();
engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
engine = createEngine(engineSettingsService, store, createTranslog());
if (randomBoolean()) {
engine.enableGcDeletes(false);
((InternalEngine)engine).config().setEnableGcDeletes(false);
}
engine.start();
if (randomBoolean()) {
engine.stop();
engine.start();
}
replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
replicaEngine = createEngine(replicaSettingsService, storeReplica, createTranslogReplica());
if (randomBoolean()) {
replicaEngine.enableGcDeletes(false);
}
replicaEngine.start();
if (randomBoolean()) {
engine.stop();
engine.start();
((InternalEngine)engine).config().setEnableGcDeletes(false);
}
}
@ -231,8 +217,22 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
return new InternalEngineHolder(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
new AnalysisService(shardId.index(), indexSettingsService.getSettings()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
return new InternalEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider));
}
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
IndexWriterConfig iwc = newIndexWriterConfig();
EngineConfig config = new EngineConfig(shardId, true, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
});
return config;
}
protected static final BytesReference B_1 = new BytesArray(new byte[]{1});
@ -245,7 +245,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.isEmpty(), equalTo(true));
assertThat(engine.segmentsStats().getCount(), equalTo(0l));
assertThat(engine.segmentsStats().getMemoryInBytes(), equalTo(0l));
final boolean defaultCompound = defaultSettings.getAsBoolean(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true);
final boolean defaultCompound = defaultSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true);
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
@ -282,7 +282,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, false).build());
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build());
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
@ -330,7 +330,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true).build());
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build());
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("4"), doc4));
engine.refresh("test", false);
@ -385,35 +385,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
public void testStartAndAcquireConcurrently() throws IOException {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final Store store = createStore();
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
final AtomicBoolean startPending = new AtomicBoolean(true);
Thread thread = new Thread() {
public void run() {
try {
Thread.yield();
engine.start();
} finally {
startPending.set(false);
}
}
};
thread.start();
while (startPending.get()) {
try {
engine.acquireSearcher("foobar").close();
break;
} catch (EngineClosedException ex) {
// all good
}
}
engine.close();
store.close();
}
@Test
public void testSegmentsWithMergeFlag() throws Exception {
@ -442,7 +413,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
});
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
engine.start();
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -716,7 +686,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
final boolean failEngine = defaultSettings.getAsBoolean(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, false);
final boolean failEngine = defaultSettings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, false);
final int failInPhase = randomIntBetween(1, 3);
try {
engine.recover(new Engine.RecoveryHandler() {
@ -1396,18 +1366,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
// Make sure enableGCDeletes == false works:
Settings settings = ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_GC_DELETES, "0ms")
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms")
.build();
Engine engine = new InternalEngineHolder(shardId, settings, threadPool,
engineSettingsService,
new ShardIndexingService(shardId, settings,
new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, engineSettingsService)),
null, store, createSnapshotDeletionPolicy(), createTranslog(), createMergePolicy(), createMergeScheduler(engineSettingsService),
new AnalysisService(shardId.index(), engineSettingsService.getSettings()), new SimilarityService(shardId.index()),
new CodecService(shardId.index()));
engine.start();
engine.enableGcDeletes(false);
Engine engine = new InternalEngine(config(engineSettingsService, store, createTranslog(), createMergeScheduler(engineSettingsService)));
((InternalEngine)engine).config().setEnableGcDeletes(false);
// Add document
Document document = testDocument();
@ -1473,7 +1436,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
ShardId shardId = ShardUtils.extractShardId(test.reader());
assertNotNull(shardId);
assertEquals(shardId, engine.shardId());
assertEquals(shardId, ((InternalEngine) engine).config().getShardId());
}
}
@ -1498,32 +1461,29 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
Settings build = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), build);
Engine holder = createEngine(indexSettingsService, store, translog);
Engine holder;
try {
holder = createEngine(indexSettingsService, store, translog);
} catch (EngineCreationFailureException ex) {
assertEquals(store.refCount(), refCount);
continue;
}
indexSettingsService.refreshSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true).build());
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true).build());
assertEquals(store.refCount(), refCount+1);
final int numStarts = scaledRandomIntBetween(1, 5);
for (int j = 0; j < numStarts; j++) {
try {
holder.start();
assertEquals(store.refCount(), refCount + 2);
holder.stop();
assertEquals(store.refCount(), refCount + 1);
holder.close();
holder = createEngine(indexSettingsService, store, translog);
assertEquals(store.refCount(), refCount + 1);
} catch (EngineCreationFailureException ex) {
// all is fine
if (ex.getCause() instanceof CorruptIndexException) {
assertEquals(store.refCount(), refCount);
try {
holder.start();
fail("Engine must have failed on corrupt index");
} catch (EngineClosedException e) {
// good!
}
break; // failed engine can't start again
}
assertEquals(store.refCount(), refCount + 1);
assertEquals(store.refCount(), refCount);
break;
}
}
translog.close();
@ -1535,15 +1495,15 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
@Test
public void testSettings() {
final InternalEngineHolder holder = (InternalEngineHolder) engine;
IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule();
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_GC_DELETES));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_CODEC));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY));
assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH));
assertTrue(settings.containsSetting(EngineConfig.INDEX_GC_DELETES_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_CODEC_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_CONCURRENCY_SETTING));
InternalEngine engine = (InternalEngine) this.engine;
CodecService codecService = new CodecService(shardId.index());
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
@ -1551,39 +1511,35 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
boolean failOnMerge = randomBoolean();
long gcDeletes = Math.max(0, randomLong());
int indexConcurrency = randomIntBetween(1, 20);
String codecName = randomFrom(holder.codecService.availableCodecs());
String codecName = randomFrom(codecService.availableCodecs());
Settings build = ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
.put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(InternalEngineHolder.INDEX_GC_DELETES, gcDeletes)
.put(InternalEngineHolder.INDEX_CODEC, codecName)
.put(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE, failOnMerge)
.put(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, indexConcurrency)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption)
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes)
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
.put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge)
.put(EngineConfig.INDEX_CONCURRENCY_SETTING, indexConcurrency)
.build();
engineSettingsService.refreshSettings(build);
LiveIndexWriterConfig currentIndexWriterConfig = holder.engineSafe().getCurrentIndexWriterConfig();
assertEquals(holder.compoundOnFlush, compoundOnFlush);
assertEquals(holder.engineSafe().isCompoundOnFlush(), compoundOnFlush);
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush);
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush);
assertEquals(holder.gcDeletesInMillis, gcDeletes);
assertEquals(holder.engineSafe().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
assertEquals(holder.codecName, codecName);
assertEquals(holder.engineSafe().getCodecName(), codecName);
assertEquals(currentIndexWriterConfig.getCodec(), holder.codecService.codec(codecName));
assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(holder.failEngineOnCorruption, failOnCorruption);
assertEquals(holder.engineSafe().isFailEngineOnCorruption(), failOnCorruption);
assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption);
assertEquals(holder.failOnMergeFailure, failOnMerge); // only on the holder
assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder
assertEquals(holder.indexConcurrency, indexConcurrency);
assertEquals(holder.engineSafe().getIndexConcurrency(), indexConcurrency);
assertEquals(engine.config().getIndexConcurrency(), indexConcurrency);
assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency);

View File

@ -51,7 +51,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.engine.internal.InternalEngineHolder;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.IndexShardState;
@ -130,13 +130,13 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption)
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
if (failOnCorruption == false) { // test the dynamic setting
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)).get();
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true)).get();
}
ensureGreen();
disableAllocation("test");
@ -240,7 +240,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true)
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
@ -330,7 +330,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true)
// This does corrupt files on the replica, so we can't check:
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
@ -415,7 +415,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true)
.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));

View File

@ -26,7 +26,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.internal.InternalEngineHolder;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -58,15 +58,15 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() <= expected2ShardsSize &&
((InternalEngineHolder) shard2.engine()).indexingBufferSize().bytes() <= expected2ShardsSize;
return shard1.engine().indexingBufferSize().bytes() <= expected2ShardsSize &&
shard2.engine().indexingBufferSize().bytes() <= expected2ShardsSize;
}
});
if (!success) {
fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" +
((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" +
((InternalEngineHolder) shard2.engine()).indexingBufferSize().bytes() + "]"
shard1.engine().indexingBufferSize().bytes() + "] shard2 [" +
shard2.engine().indexingBufferSize().bytes() + "]"
);
}
@ -74,13 +74,13 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() >= expected1ShardSize;
return shard1.engine().indexingBufferSize().bytes() >= expected1ShardSize;
}
});
if (!success) {
fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" +
((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
shard1.engine().indexingBufferSize().bytes() + "]"
);
}
@ -99,12 +99,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
return shard1.engine().indexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
}
});
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
);
}
@ -113,12 +113,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() > Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
return shard1.engine().indexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
}
});
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
);
}
@ -127,12 +127,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
return shard1.engine().indexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
}
});
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
);
}
}

View File

@ -38,7 +38,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.engine.MockInternalEngineHolder;
import org.elasticsearch.test.engine.MockInternalEngine;
import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
import org.junit.Test;
@ -108,10 +108,10 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
ImmutableSettings.Builder settings = settingsBuilder()
.put(indexSettings())
.put(MockInternalEngineHolder.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
.put(MockInternalEngine.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
.put(EXCEPTION_TOP_LEVEL_RATIO_KEY, topLevelRate)
.put(EXCEPTION_LOW_LEVEL_RATIO_KEY, lowLevelRate)
.put(MockInternalEngineHolder.WRAP_READER_RATIO, 1.0d);
.put(MockInternalEngine.WRAP_READER_RATIO, 1.0d);
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
client().admin().indices().prepareCreate("test")
.setSettings(settings)
@ -201,7 +201,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
// TODO: Generalize this class and add it as a utility
public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngineHolder.DirectoryReaderWrapper {
public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngine.DirectoryReaderWrapper {
private final Settings settings;
static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {

View File

@ -35,7 +35,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.engine.MockInternalEngineHolder;
import org.elasticsearch.test.engine.MockInternalEngine;
import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.elasticsearch.test.store.MockFSDirectoryService;
@ -230,10 +230,10 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
Builder settings = settingsBuilder()
.put(indexSettings())
.put(MockInternalEngineHolder.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
.put(MockInternalEngine.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
.put(EXCEPTION_TOP_LEVEL_RATIO_KEY, topLevelRate)
.put(EXCEPTION_LOW_LEVEL_RATIO_KEY, lowLevelRate)
.put(MockInternalEngineHolder.WRAP_READER_RATIO, 1.0d);
.put(MockInternalEngine.WRAP_READER_RATIO, 1.0d);
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
assertAcked(prepareCreate("test")
.setSettings(settings)
@ -288,7 +288,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngineHolder.DirectoryReaderWrapper {
public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngine.DirectoryReaderWrapper {
private final Settings settings;
static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {

View File

@ -35,10 +35,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.engine.internal.InternalEngineHolder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
@ -193,8 +190,8 @@ public abstract class ElasticsearchSingleNodeTest extends ElasticsearchTestCase
return instanceFromNode.indexServiceSafe(index);
}
protected static InternalEngine engine(IndexService service) {
return ((InternalEngineHolder) ((IndexShard) service.shard(0)).engine()).engineSafe();
protected static org.elasticsearch.index.engine.Engine engine(IndexService service) {
return service.shard(0).engine();
}
/**

View File

@ -74,7 +74,7 @@ import org.elasticsearch.index.cache.filter.AutoFilterCachingPolicy;
import org.elasticsearch.index.cache.filter.FilterCacheModule;
import org.elasticsearch.index.cache.filter.none.NoneFilterCache;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.index.engine.EngineModule;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -89,7 +89,7 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.engine.MockEngineFactory;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService;
@ -364,8 +364,7 @@ public final class InternalTestCluster extends TestCluster {
.put(SETTING_CLUSTER_NODE_SEED, seed);
if (ENABLE_MOCK_MODULES && usually(random)) {
builder.put("index.store.type", MockFSIndexStoreModule.class.getName()); // no RAM dir for now!
builder.put(EngineModule.ENGINE_TYPE, MockEngineModule.class.getName());
builder.put(IndexShardModule.ENGINE_FACTORY, MockEngineFactory.class);
builder.put(PageCacheRecyclerModule.CACHE_IMPL, MockPageCacheRecyclerModule.class.getName());
builder.put(BigArraysModule.IMPL, MockBigArraysModule.class.getName());
builder.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName());

View File

@ -16,16 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.engine;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
public class MockEngineModule extends AbstractModule {
/**
*
*/
public final class MockEngineFactory implements EngineFactory {
@Override
protected void configure() {
bind(Engine.class).to(MockInternalEngineHolder.class).asEagerSingleton();
public Engine newEngine(EngineConfig config) {
return new MockInternalEngine(config);
}
}

View File

@ -18,53 +18,61 @@
*/
package org.elasticsearch.test.engine;
import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MockInternalEngine extends InternalEngine {
public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
public static class MockContext {
public final Random random;
public final boolean wrapReader;
public final Class<? extends FilterDirectoryReader> wrapper;
public final Settings indexSettings;
public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
this.random = random;
this.wrapReader = wrapReader;
this.wrapper = wrapper;
this.indexSettings = indexSettings;
}
}
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
private MockInternalEngineHolder.MockContext mockContext;
private MockContext mockContext;
public MockInternalEngine(MockInternalEngineHolder.MockContext mockContext, ShardId shardId, ESLogger logger, CodecService codecService,
ThreadPool threadPool, ShardIndexingService indexingService,
@Nullable IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, AnalysisService analysisService,
SimilarityService similarityService, boolean enableGcDeletes, long gcDeletesInMillis, ByteSizeValue indexingBufferSize, String codecName,
boolean compoundOnFlush, int indexConcurrency, boolean optimizeAutoGenerateId, boolean failEngineOnCorruption,
FailedEngineListener failedEngineListener) throws EngineException {
super(shardId, logger, codecService, threadPool, indexingService, warmer, store, deletionPolicy, translog, mergePolicyProvider,
mergeScheduler, analysisService, similarityService, enableGcDeletes, gcDeletesInMillis, indexingBufferSize, codecName,
compoundOnFlush, indexConcurrency, optimizeAutoGenerateId, failEngineOnCorruption, failedEngineListener);
this.mockContext = mockContext;
public MockInternalEngine(EngineConfig config) throws EngineException {
super(config);
Settings indexSettings = config.getIndexSettings();
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
Random random = new Random(seed);
final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
boolean wrapReader = random.nextDouble() < ratio;
if (logger.isTraceEnabled()) {
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
}
mockContext = new MockContext(random, wrapReader, wrapper, indexSettings);
}
@Override
@ -189,4 +197,24 @@ public class MockInternalEngine extends InternalEngine {
}
}
public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
protected final SubReaderWrapper subReaderWrapper;
public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrapper) {
super(in, subReaderWrapper);
this.subReaderWrapper = subReaderWrapper;
}
@Override
public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}
@Override
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
}
}
}

View File

@ -1,119 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.engine;
import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.engine.internal.InternalEngineHolder;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Random;
public final class MockInternalEngineHolder extends InternalEngineHolder implements Engine {
public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
public static class MockContext {
public final Random random;
public final boolean wrapReader;
public final Class<? extends FilterDirectoryReader> wrapper;
public final Settings indexSettings;
public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
this.random = random;
this.wrapReader = wrapReader;
this.wrapper = wrapper;
this.indexSettings = indexSettings;
}
}
MockContext mockContext;
@Inject
public MockInternalEngineHolder(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer, Store store,
SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
CodecService codecService) throws EngineException {
super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService,
similarityService, codecService
);
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
Random random = new Random(seed);
final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
boolean wrapReader = random.nextDouble() < ratio;
if (logger.isTraceEnabled()) {
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
}
mockContext = new MockContext(random, wrapReader, wrapper, indexSettings);
}
@Override
protected InternalEngine createEngine() {
return new MockInternalEngine(mockContext, shardId, logger, codecService, threadPool, indexingService,
warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService,
enableGcDeletes, gcDeletesInMillis,
indexingBufferSize, codecName, compoundOnFlush, indexConcurrency, optimizeAutoGenerateId, failEngineOnCorruption, this);
}
public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
protected final SubReaderWrapper subReaderWrapper;
public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrapper) {
super(in, subReaderWrapper);
this.subReaderWrapper = subReaderWrapper;
}
@Override
public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}
@Override
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
}
}
}