IMC is now just another IndexingOperationListener

This commit is contained in:
Michael McCandless 2016-01-11 06:07:19 -05:00 committed by mikemccand
parent f3de7783d2
commit 5e7144f5c5
8 changed files with 50 additions and 42 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
@ -241,7 +242,8 @@ public final class IndexModule {
IndexSearcherWrapper newWrapper(final IndexService indexService);
}
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry) throws IOException {
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry,
IndexingMemoryController indexingMemoryController) throws IOException {
final IndexSettings settings = indexSettings.newWithListener(settingsConsumers);
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
IndexEventListener eventListener = freeze();
@ -263,6 +265,6 @@ public final class IndexModule {
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
final QueryCache queryCache = queryCacheProvider.apply(settings, servicesProvider.getIndicesQueryCache());
return new IndexService(settings, environment, new SimilarityService(settings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry);
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indexingMemoryController);
}
}

View File

@ -60,6 +60,7 @@ import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.threadpool.ThreadPool;
@ -102,6 +103,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
private final IndexingSlowLog slowLog;
private final IndexingMemoryController indexingMemoryController;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
SimilarityService similarityService,
@ -113,7 +115,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
IndexStore indexStore,
IndexEventListener eventListener,
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
MapperRegistry mapperRegistry) throws IOException {
MapperRegistry mapperRegistry,
IndexingMemoryController indexingMemoryController) throws IOException {
super(indexSettings);
this.indexSettings = indexSettings;
this.analysisService = registry.build(indexSettings);
@ -132,6 +135,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.slowLog = new IndexingSlowLog(indexSettings.getSettings());
this.indexingMemoryController = indexingMemoryController;
}
public int numberOfShards() {
@ -296,7 +300,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
if (useShadowEngine(primary, indexSettings)) {
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow engines don't index
} else {
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog);
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog, indexingMemoryController);
}
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");

View File

@ -28,7 +28,6 @@ import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -50,10 +49,9 @@ public final class NodeServicesProvider {
private final ScriptService scriptService;
private final IndicesFieldDataCache indicesFieldDataCache;
private final CircuitBreakerService circuitBreakerService;
private final IndexingMemoryController indexingMemoryController;
@Inject
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, TermVectorsService termVectorsService, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService, IndexingMemoryController indexingMemoryController) {
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, TermVectorsService termVectorsService, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) {
this.threadPool = threadPool;
this.indicesQueryCache = indicesQueryCache;
this.termVectorsService = termVectorsService;
@ -64,7 +62,6 @@ public final class NodeServicesProvider {
this.scriptService = scriptService;
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
this.indexingMemoryController = indexingMemoryController;
}
public ThreadPool getThreadPool() {
@ -104,8 +101,4 @@ public final class NodeServicesProvider {
public CircuitBreakerService getCircuitBreakerService() {
return circuitBreakerService;
}
public IndexingMemoryController getIndexingMemoryController() {
return indexingMemoryController;
}
}

View File

@ -167,13 +167,12 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings;
private final NodeServicesProvider provider;
<<<<<<< HEAD
/** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh */
/** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
* being indexed/deleted. */
private final AtomicLong writingBytes = new AtomicLong();
=======
>>>>>>> master
private TimeValue refreshInterval;
private volatile ScheduledFuture<?> refreshScheduledFuture;
@ -216,7 +215,6 @@ public class IndexShard extends AbstractIndexShardComponent {
* IndexingMemoryController}).
*/
private final AtomicBoolean active = new AtomicBoolean();
private final IndexingMemoryController indexingMemoryController;
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@ -273,7 +271,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.indexingMemoryController = provider.getIndexingMemoryController();
this.provider = provider;
this.searcherWrapper = indexSearcherWrapper;
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, newQueryShardContext());
@ -510,9 +507,6 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
// Notify IMC so that it can go and check heap used by all indexing buffers periodically:
indexingMemoryController.bytesWritten(index.getTranslogLocation().size);
indexingOperationListeners.postIndex(index);
return created;
@ -556,9 +550,6 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
// Notify IMC so that it can go and check heap used by all indexing buffers periodically:
indexingMemoryController.bytesWritten(delete.getTranslogLocation().size);
indexingOperationListeners.postDelete(delete);
}

View File

@ -26,10 +26,12 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
@ -46,7 +48,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
public class IndexingMemoryController extends AbstractComponent implements Closeable {
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@ -124,11 +126,19 @@ public class IndexingMemoryController extends AbstractComponent implements Close
SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
this.scheduler = scheduleTask(threadPool);
// Need to save this so we can later launch async "write indexing buffer to disk" on shards:
this.threadPool = threadPool;
}
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
// it's fine to run it on the scheduler thread, no busy work
return threadPool.scheduleWithFixedDelay(statusChecker, interval);
if (threadPool != null) {
return threadPool.scheduleWithFixedDelay(statusChecker, interval);
} else {
// tests pass null for threadPool --> no periodic checking
return null;
}
}
@Override
@ -198,7 +208,17 @@ public class IndexingMemoryController extends AbstractComponent implements Close
shard.deactivateThrottling();
}
static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
@Override
public void postIndex(Engine.Index index) {
bytesWritten(index.getTranslogLocation().size);
}
@Override
public void postDelete(Engine.Delete delete) {
bytesWritten(delete.getTranslogLocation().size);
}
private static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
final long bytesUsed;
final IndexShard shard;

View File

@ -293,14 +293,13 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
indexModule.addIndexEventListener(indexingMemoryController);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
indexModule.addIndexEventListener(oldShardsStats);
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry);
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indexingMemoryController);
boolean success = false;
try {
assert indexService.getIndexEventListener() == listener;

View File

@ -51,6 +51,7 @@ import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -59,7 +60,6 @@ import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineService;
@ -109,8 +109,7 @@ public class IndexModuleTests extends ESTestCase {
scriptEngines.addAll(Arrays.asList(scriptEngineServices));
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), new ScriptContextRegistry(Collections.emptyList()));
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, Collections.emptySet(), new NamedWriteableRegistry());
IndexingMemoryController indexingMemoryController = new IndexingMemoryController(settings, threadPool, null);
return new NodeServicesProvider(threadPool, indicesQueryCache, null, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService, indexingMemoryController);
return new NodeServicesProvider(threadPool, indicesQueryCache, null, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService);
}
@Override
@ -139,7 +138,7 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.setSearcherWrapper((s) -> new Wrapper());
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
assertSame(indexService.getEngineFactory(), module.engineFactory.get());
indexService.close("simon says", false);
@ -152,7 +151,7 @@ public class IndexModuleTests extends ESTestCase {
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.addIndexStore("foo_store", FooStore::new);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
assertTrue(indexService.getIndexStore() instanceof FooStore);
try {
module.addIndexStore("foo_store", FooStore::new);
@ -176,7 +175,7 @@ public class IndexModuleTests extends ESTestCase {
Consumer<Settings> listener = (s) -> {};
module.addIndexSettingsListener(listener);
module.addIndexEventListener(eventListener);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
IndexSettings x = indexService.getIndexSettings();
assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap());
assertEquals(x.getIndex(), index);
@ -206,7 +205,7 @@ public class IndexModuleTests extends ESTestCase {
} catch (IllegalArgumentException ex) {
}
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
IndexSettings x = indexService.getIndexSettings();
assertEquals(1, x.getUpdateListeners().size());
assertSame(x.getUpdateListeners().get(0), listener);
@ -233,7 +232,7 @@ public class IndexModuleTests extends ESTestCase {
}
});
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
SimilarityService similarityService = indexService.similarityService();
assertNotNull(similarityService.getSimilarity("my_similarity"));
assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity);
@ -250,7 +249,7 @@ public class IndexModuleTests extends ESTestCase {
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
try {
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
} catch (IllegalArgumentException ex) {
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
}
@ -264,7 +263,7 @@ public class IndexModuleTests extends ESTestCase {
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
try {
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
} catch (IllegalArgumentException ex) {
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
}
@ -311,7 +310,7 @@ public class IndexModuleTests extends ESTestCase {
assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]");
}
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
indexService.close("simon says", false);
}
@ -321,7 +320,7 @@ public class IndexModuleTests extends ESTestCase {
.put("path.home", createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
indexService.close("simon says", false);
}

View File

@ -95,7 +95,7 @@ import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.test.DummyShardLock;