decouple IndexingMemoryController from IndexShard

This commit is contained in:
Simon Willnauer 2015-11-04 15:19:52 +01:00
parent 487af301ea
commit a311491c8e
13 changed files with 45 additions and 31 deletions

View File

@ -125,6 +125,18 @@ final class CompositeIndexEventListener implements IndexEventListener {
}
}
@Override
public void onShardActive(IndexShard indexShard) {
for (IndexEventListener listener : listeners) {
try {
listener.onShardActive(indexShard);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke on shard active callback", t, indexShard.shardId().getId());
throw t;
}
}
}
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
for (IndexEventListener listener : listeners) {

View File

@ -68,7 +68,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
/**
*
*/
public class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable<IndexShard>{
public final class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable<IndexShard>{
private final IndexEventListener eventListener;
private final AnalysisService analysisService;
@ -538,15 +538,15 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
final EngineFactory getEngineFactory() {
return engineFactory;
}
} // pkg private for testing
final IndexSearcherWrapper getSearcherWrapper() {
return searcherWrapper;
}
} // pkg private for testing
final IndexStore getIndexStore() {
return indexStore;
}
} // pkg private for testing
}

View File

@ -45,7 +45,6 @@ public final class NodeServicesProvider {
private final TermVectorsService termVectorsService;
private final IndicesWarmer warmer;
private final BigArrays bigArrays;
private final IndexingMemoryController indexingMemoryController;
private final Client client;
private final IndicesQueriesRegistry indicesQueriesRegistry;
private final ScriptService scriptService;
@ -53,13 +52,12 @@ public final class NodeServicesProvider {
private final CircuitBreakerService circuitBreakerService;
@Inject
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, TermVectorsService termVectorsService, @Nullable IndicesWarmer warmer, BigArrays bigArrays, IndexingMemoryController indexingMemoryController, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) {
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, TermVectorsService termVectorsService, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) {
this.threadPool = threadPool;
this.indicesQueryCache = indicesQueryCache;
this.termVectorsService = termVectorsService;
this.warmer = warmer;
this.bigArrays = bigArrays;
this.indexingMemoryController = indexingMemoryController;
this.client = client;
this.indicesQueriesRegistry = indicesQueriesRegistry;
this.scriptService = scriptService;
@ -97,10 +95,6 @@ public final class NodeServicesProvider {
return scriptService;
}
public IndexingMemoryController getIndexingMemoryController() {
return indexingMemoryController;
}
public IndicesFieldDataCache getIndicesFieldDataCache() {
return indicesFieldDataCache;
}

View File

@ -23,7 +23,6 @@ import org.apache.lucene.util.Accountable;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.plain.BytesBinaryDVIndexFieldData;
@ -159,7 +158,6 @@ public class IndexFieldDataService extends AbstractIndexComponent implements Clo
private volatile IndexFieldDataCache.Listener listener = DEFAULT_NOOP_LISTENER;
@Inject
public IndexFieldDataService(IndexSettings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService, MapperService mapperService) {
super(indexSettings);

View File

@ -33,7 +33,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -105,7 +104,6 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
private volatile Set<String> parentTypes = emptySet();
@Inject
public MapperService(IndexSettings indexSettings, AnalysisService analysisService,
SimilarityService similarityService) {
super(indexSettings);

View File

@ -89,6 +89,13 @@ public interface IndexEventListener {
*/
default void onShardInactive(IndexShard indexShard) {}
/**
* Called when a shard is marked as active ie. was previously inactive and is now active again.
*
* @param indexShard The shard that was marked active
*/
default void onShardActive(IndexShard indexShard) {}
/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes

View File

@ -198,8 +198,6 @@ public class IndexShard extends AbstractIndexShardComponent {
* IndexingMemoryController}). */
private final AtomicBoolean active = new AtomicBoolean();
private final IndexingMemoryController indexingMemoryController;
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory,
@ -248,7 +246,6 @@ public class IndexShard extends AbstractIndexShardComponent {
cachingPolicy = new UsageTrackingQueryCachingPolicy();
}
this.indexingMemoryController = provider.getIndexingMemoryController();
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdOperations = this.indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
this.flushThresholdSize = this.indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
@ -921,7 +918,7 @@ public class IndexShard extends AbstractIndexShardComponent {
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
indexingMemoryController.forceCheck();
indexEventListener.onShardActive(this);
}
}
@ -1468,7 +1465,7 @@ public class IndexShard extends AbstractIndexShardComponent {
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexingMemoryController.getInactiveTime());
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexSettings.getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
}
private static class IndexShardOperationCounter extends AbstractRefCounted {

View File

@ -33,7 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
public class SimilarityService extends AbstractIndexComponent {
public final class SimilarityService extends AbstractIndexComponent {
public final static String DEFAULT_SIMILARITY = "default";
private final Similarity defaultSimilarity;

View File

@ -63,7 +63,6 @@ public final class IndicesWarmer extends AbstractComponent {
if (shard.state() == IndexShardState.CLOSED) {
return;
}
final IndexMetaData indexMetaData = settings.getIndexMetaData();
final Settings indexSettings = settings.getSettings();
if (!indexSettings.getAsBoolean(INDEX_WARMER_ENABLED, settings.getNodeSettings().getAsBoolean(INDEX_WARMER_ENABLED, true))) {
return;

View File

@ -53,6 +53,7 @@ import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryState;
@ -112,9 +113,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService, SearchService searchService, SyncedFlushService syncedFlushService, RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider) {
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider, IndexingMemoryController indexingMemoryController) {
super(settings);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService, indexingMemoryController);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
@ -40,7 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> implements IndexEventListener {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@ -426,4 +427,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
public TimeValue getInactiveTime() {
return inactiveTime;
}
@Override
public void onShardActive(IndexShard indexShard) {
forceCheck();
}
}

View File

@ -42,8 +42,6 @@ import org.elasticsearch.index.cache.query.index.IndexQueryCache;
import org.elasticsearch.index.cache.query.none.NoneQueryCache;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
@ -106,7 +104,7 @@ public class IndexModuleTests extends ModuleTestCase {
scriptEngines.addAll(Arrays.asList(scriptEngineServices));
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), new ScriptContextRegistry(Collections.EMPTY_LIST));
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, Collections.EMPTY_SET, new NamedWriteableRegistry());
return new NodeServicesProvider(threadPool, indicesQueryCache, null, warmer, bigArrays, null, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService);
return new NodeServicesProvider(threadPool, indicesQueryCache, null, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService);
}
@Override

View File

@ -36,8 +36,9 @@ public class SMBStorePlugin extends Plugin {
return "SMB Store Plugin";
}
public void onModule(IndexModule storeModule) {
storeModule.addIndexStore("smb_mmap_fs", SmbMmapFsIndexStore::new);
storeModule.addIndexStore("smb_simple_fs", SmbSimpleFsIndexStore::new);
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexStore("smb_mmap_fs", SmbMmapFsIndexStore::new);
indexModule.addIndexStore("smb_simple_fs", SmbSimpleFsIndexStore::new);
}
}