Make IndexingMemoryController private to IndicesService

This commit detaches the IndexingMemoryController from guice and moves
it's creation and closing into IndicesService where it logically belongs.
This commit is contained in:
Simon Willnauer 2016-01-10 20:40:51 +01:00
parent 37f6b5ced7
commit 91e8d156f5
9 changed files with 33 additions and 45 deletions

View File

@ -35,7 +35,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
@ -192,7 +192,7 @@ public final class EngineConfig {
}
/**
* Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link org.elasticsearch.indices.memory.IndexingMemoryController}
* Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link IndexingMemoryController}
*/
public ByteSizeValue getIndexingBufferSize() {
return indexingBufferSize;

View File

@ -111,7 +111,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.percolator.PercolatorService;

View File

@ -17,10 +17,9 @@
* under the License.
*/
package org.elasticsearch.indices.memory;
package org.elasticsearch.indices;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -32,16 +31,16 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> implements IndexEventListener {
public class IndexingMemoryController extends AbstractComponent implements IndexEventListener, Closeable {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@ -70,10 +69,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Once a shard becomes inactive, we reduce the {@code IndexWriter} buffer to this value (500 KB) to let active shards use the heap instead. */
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");
/** Once a shard becomes inactive, we reduce the {@code Translog} buffer to this value (1 KB) to let active shards use the heap instead. */
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
private final ThreadPool threadPool;
private final IndicesService indicesService;
private final ByteSizeValue indexingBuffer;
@ -81,22 +76,20 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final ByteSizeValue maxShardIndexBufferSize;
private final TimeValue interval;
private volatile ScheduledFuture scheduler;
private final ScheduledFuture scheduler;
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private final ShardsIndicesStatusChecker statusChecker;
@Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
}
// for testing
protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) {
IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) {
super(settings);
this.threadPool = threadPool;
this.indicesService = indicesService;
ByteSizeValue indexingBuffer;
@ -131,29 +124,23 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize,
SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval);
this.scheduler = scheduleTask(threadPool);
}
@Override
protected void doStart() {
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
// it's fine to run it on the scheduler thread, no busy work
this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval);
return threadPool.scheduleWithFixedDelay(statusChecker, interval);
}
@Override
protected void doStop() {
public void close() {
FutureUtils.cancel(scheduler);
scheduler = null;
}
@Override
protected void doClose() {
}
/**
* returns the current budget for the total amount of indexing buffers of
* active shards on this node
*/
public ByteSizeValue indexingBufferSize() {
ByteSizeValue indexingBufferSize() {
return indexingBuffer;
}
@ -188,7 +175,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
/** check if any shards active status changed, now. */
public void forceCheck() {
void forceCheck() {
statusChecker.run();
}

View File

@ -111,7 +111,6 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoverySource;
@ -273,7 +272,6 @@ public class IndicesModule extends AbstractModule {
bind(RecoverySource.class).asEagerSingleton();
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndexingMemoryController.class).asEagerSingleton();
bind(SyncedFlushService.class).asEagerSingleton();
bind(IndicesQueryCache.class).asEagerSingleton();
bind(IndicesRequestCache.class).asEagerSingleton();

View File

@ -65,6 +65,7 @@ import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Files;
@ -105,6 +106,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final OldShardsStats oldShardsStats = new OldShardsStats();
private final IndexStoreConfig indexStoreConfig;
private final MapperRegistry mapperRegistry;
private final IndexingMemoryController indexingMemoryController;
@Override
protected void doStart() {
@ -114,7 +116,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv,
ClusterSettings clusterSettings, AnalysisRegistry analysisRegistry,
IndicesQueriesRegistry indicesQueriesRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, MapperRegistry mapperRegistry) {
ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool) {
super(settings);
this.pluginsService = pluginsService;
this.nodeEnv = nodeEnv;
@ -127,7 +129,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
this.mapperRegistry = mapperRegistry;
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
indexingMemoryController = new IndexingMemoryController(settings, threadPool, this);
}
@Override
@ -161,7 +163,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
@Override
protected void doClose() {
IOUtils.closeWhileHandlingException(analysisRegistry);
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController);
}
/**
@ -291,6 +293,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
indexModule.addIndexEventListener(indexingMemoryController);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}

View File

@ -63,7 +63,6 @@ import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryState;
@ -130,9 +129,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider, IndexingMemoryController indexingMemoryController) {
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider) {
super(settings);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService, indexingMemoryController);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;

View File

@ -69,8 +69,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.monitor.MonitorService;
@ -249,7 +247,6 @@ public class Node implements Releasable {
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(SnapshotsService.class).start();
@ -308,7 +305,6 @@ public class Node implements Releasable {
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
// we close indices first, so operations won't be allowed on it
injector.getInstance(IndexingMemoryController.class).stop();
injector.getInstance(IndicesTTLService.class).stop();
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
@ -360,7 +356,6 @@ public class Node implements Releasable {
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");
injector.getInstance(IndexingMemoryController.class).close();
injector.getInstance(IndicesTTLService.class).close();
injector.getInstance(IndicesService.class).close();
// close filter/fielddata caches after indices

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.indices.memory;
package org.elasticsearch.indices;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.memory;
package org.elasticsearch.indices;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -24,8 +24,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.HashMap;
@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -120,6 +121,11 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
activeShards.add(shard);
forceCheck();
}
@Override
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
return null;
}
}
public void testShardAdditionAndRemoval() {