Merge pull request #14285 from s1monw/remove_circular_dep

Remove circular dependency between IndicesService and IndicesStore
This commit is contained in:
Simon Willnauer 2015-10-26 21:32:50 +01:00
commit 3146cd21e2
14 changed files with 140 additions and 114 deletions

View File

@ -36,7 +36,7 @@ public class StoreRateLimiting {
void onPause(long nanos); void onPause(long nanos);
} }
public static enum Type { public enum Type {
NONE, NONE,
MERGE, MERGE,
ALL; ALL;

View File

@ -80,6 +80,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
@ -158,8 +159,8 @@ public class ClusterModule extends AbstractModule {
registerClusterDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP + "*", Validator.EMPTY); registerClusterDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP + "*", Validator.EMPTY);
registerClusterDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP + "*", Validator.EMPTY); registerClusterDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP + "*", Validator.EMPTY);
registerClusterDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP + "*", Validator.EMPTY); registerClusterDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP + "*", Validator.EMPTY);
registerClusterDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE, Validator.EMPTY); registerClusterDynamicSetting(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE, Validator.EMPTY);
registerClusterDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); registerClusterDynamicSetting(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME); registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME);
registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY); registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY);

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -744,7 +745,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
/** All known byte-sized cluster settings. */ /** All known byte-sized cluster settings. */
public static final Set<String> CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( public static final Set<String> CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet(
IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC,
RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE,
RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC,

View File

@ -32,7 +32,7 @@ import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.index.store.IndexStoreConfig;
import java.util.*; import java.util.*;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -55,19 +55,19 @@ public class IndexModule extends AbstractModule {
public static final String STORE_TYPE = "index.store.type"; public static final String STORE_TYPE = "index.store.type";
public static final String SIMILARITY_SETTINGS_PREFIX = "index.similarity"; public static final String SIMILARITY_SETTINGS_PREFIX = "index.similarity";
private final IndexSettings indexSettings; private final IndexSettings indexSettings;
private final IndicesStore indicesStore; private final IndexStoreConfig indexStoreConfig;
// pkg private so tests can mock // pkg private so tests can mock
Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class; Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
Class<? extends IndexSearcherWrapper> indexSearcherWrapper = null; Class<? extends IndexSearcherWrapper> indexSearcherWrapper = null;
private final Set<Consumer<Settings>> settingsConsumers = new HashSet<>(); private final Set<Consumer<Settings>> settingsConsumers = new HashSet<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>(); private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private IndexEventListener listener; private IndexEventListener listener;
private final Map<String, BiFunction<IndexSettings, IndicesStore, IndexStore>> storeTypes = new HashMap<>();
private final Map<String, BiFunction<String, Settings, SimilarityProvider>> similarities = new HashMap<>(); private final Map<String, BiFunction<String, Settings, SimilarityProvider>> similarities = new HashMap<>();
private final Map<String, BiFunction<IndexSettings, IndexStoreConfig, IndexStore>> storeTypes = new HashMap<>();
public IndexModule(IndexSettings indexSettings, IndicesStore indicesStore) { public IndexModule(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
this.indicesStore = indicesStore; this.indexStoreConfig = indexStoreConfig;
this.indexSettings = indexSettings; this.indexSettings = indexSettings;
} }
@ -134,7 +134,7 @@ public class IndexModule extends AbstractModule {
* @param type the type to register * @param type the type to register
* @param provider the instance provider / factory method * @param provider the instance provider / factory method
*/ */
public void addIndexStore(String type, BiFunction<IndexSettings, IndicesStore, IndexStore> provider) { public void addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider) {
if (storeTypes.containsKey(type)) { if (storeTypes.containsKey(type)) {
throw new IllegalArgumentException("key [" + type +"] already registerd"); throw new IllegalArgumentException("key [" + type +"] already registerd");
} }
@ -192,13 +192,13 @@ public class IndexModule extends AbstractModule {
final String storeType = settings.getSettings().get(STORE_TYPE); final String storeType = settings.getSettings().get(STORE_TYPE);
final IndexStore store; final IndexStore store;
if (storeType == null || isBuiltinType(storeType)) { if (storeType == null || isBuiltinType(storeType)) {
store = new IndexStore(settings, indicesStore); store = new IndexStore(settings, indexStoreConfig);
} else { } else {
BiFunction<IndexSettings, IndicesStore, IndexStore> factory = storeTypes.get(storeType); BiFunction<IndexSettings, IndexStoreConfig, IndexStore> factory = storeTypes.get(storeType);
if (factory == null) { if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]"); throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
} }
store = factory.apply(settings, indicesStore); store = factory.apply(settings, indexStoreConfig);
if (store == null) { if (store == null) {
throw new IllegalStateException("store must not be null"); throw new IllegalStateException("store must not be null");
} }

View File

@ -25,8 +25,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.store.IndicesStore;
/** /**
* *
*/ */
@ -35,16 +33,16 @@ public class IndexStore extends AbstractIndexComponent {
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type"; public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec"; public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
protected final IndicesStore indicesStore; protected final IndexStoreConfig indexStoreConfig;
private volatile String rateLimitingType; private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle; private volatile ByteSizeValue rateLimitingThrottle;
private volatile boolean nodeRateLimiting; private volatile boolean nodeRateLimiting;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
public IndexStore(IndexSettings indexSettings, IndicesStore indicesStore) { public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
super(indexSettings); super(indexSettings);
this.indicesStore = indicesStore; this.indexStoreConfig = indexStoreConfig;
this.rateLimitingType = indexSettings.getSettings().get(INDEX_STORE_THROTTLE_TYPE, "none"); this.rateLimitingType = indexSettings.getSettings().get(INDEX_STORE_THROTTLE_TYPE, "none");
if (rateLimitingType.equalsIgnoreCase("node")) { if (rateLimitingType.equalsIgnoreCase("node")) {
@ -64,7 +62,7 @@ public class IndexStore extends AbstractIndexComponent {
* the node level one (defaults to the node level one). * the node level one (defaults to the node level one).
*/ */
public StoreRateLimiting rateLimiting() { public StoreRateLimiting rateLimiting() {
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting; return nodeRateLimiting ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
} }
/** /**

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
/**
* IndexStoreConfig encapsulates node / cluster level configuration for index level {@link IndexStore} instances.
* For instance it maintains the node level rate limiter configuration: updates to the cluster that disable or enable
* {@value #INDICES_STORE_THROTTLE_TYPE} or {@value #INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC} are reflected immediately
* on all referencing {@link IndexStore} instances
*/
public class IndexStoreConfig implements NodeSettingsService.Listener{
private static final ByteSizeValue DEFAULT_THROTTLE = new ByteSizeValue(10240, ByteSizeUnit.MB);
/**
* Configures the node / cluster level throttle type. See {@link StoreRateLimiting.Type}.
*/
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
/**
* Configures the node / cluster level throttle intensity. The default is <tt>10240 MB</tt>
*/
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ESLogger logger;
public IndexStoreConfig(Settings settings) {
logger = Loggers.getLogger(IndexStoreConfig.class, settings);
// we don't limit by default (we default to CMS's auto throttle instead):
this.rateLimitingType = settings.get("indices.store.throttle.type", StoreRateLimiting.Type.NONE.name());
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", DEFAULT_THROTTLE);
rateLimiting.setMaxRate(rateLimitingThrottle);
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
}
/**
* Returns the node level rate limiter
*/
public StoreRateLimiting getNodeRateLimiter(){
return rateLimiting;
}
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, this.rateLimitingType);
// try and parse the type
StoreRateLimiting.Type.fromString(rateLimitingType);
if (!rateLimitingType.equals(this.rateLimitingType)) {
logger.info("updating indices.store.throttle.type from [{}] to [{}]", this.rateLimitingType, rateLimitingType);
this.rateLimitingType = rateLimitingType;
this.rateLimiting.setType(rateLimitingType);
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(this.rateLimitingThrottle)) {
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", this.rateLimitingThrottle, rateLimitingThrottle, this.rateLimitingType);
this.rateLimitingThrottle = rateLimitingThrottle;
this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}

View File

@ -59,9 +59,10 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
import java.io.Closeable; import java.io.Closeable;
@ -117,15 +118,18 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>(); private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final OldShardsStats oldShardsStats = new OldShardsStats(); private final OldShardsStats oldShardsStats = new OldShardsStats();
private final IndexStoreConfig indexStoreConfig;
@Inject @Inject
public IndicesService(Settings settings, IndicesAnalysisService indicesAnalysisService, Injector injector, PluginsService pluginsService, NodeEnvironment nodeEnv) { public IndicesService(Settings settings, IndicesAnalysisService indicesAnalysisService, Injector injector, PluginsService pluginsService, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService) {
super(settings); super(settings);
this.indicesAnalysisService = indicesAnalysisService; this.indicesAnalysisService = indicesAnalysisService;
this.injector = injector; this.injector = injector;
this.pluginsService = pluginsService; this.pluginsService = pluginsService;
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
this.indexStoreConfig = new IndexStoreConfig(settings);
nodeSettingsService.addListener(indexStoreConfig);
} }
@Override @Override
@ -140,16 +144,13 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
Set<String> indices = new HashSet<>(this.indices.keySet()); Set<String> indices = new HashSet<>(this.indices.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size()); final CountDownLatch latch = new CountDownLatch(indices.size());
for (final String index : indices) { for (final String index : indices) {
indicesStopExecutor.execute(new Runnable() { indicesStopExecutor.execute(() -> {
@Override try {
public void run() { removeIndex(index, "shutdown", false);
try { } catch (Throwable e) {
removeIndex(index, "shutdown", false); logger.warn("failed to remove index on stop [" + index + "]", e);
} catch (Throwable e) { } finally {
logger.warn("failed to remove index on stop [" + index + "]", e); latch.countDown();
} finally {
latch.countDown();
}
} }
}); });
} }
@ -288,7 +289,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed"); throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
} }
final IndicesStore indicesStore = injector.getInstance(IndicesStore.class); // TODO remove this circular dep!!
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, Collections.EMPTY_LIST); final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, Collections.EMPTY_LIST);
Index index = new Index(indexMetaData.getIndex()); Index index = new Index(indexMetaData.getIndex());
if (indices.containsKey(index.name())) { if (indices.containsKey(index.name())) {
@ -306,7 +306,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
for (Module pluginModule : pluginsService.indexModules(idxSettings.getSettings())) { for (Module pluginModule : pluginsService.indexModules(idxSettings.getSettings())) {
modules.add(pluginModule); modules.add(pluginModule);
} }
final IndexModule indexModule = new IndexModule(idxSettings, indicesStore); final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig);
for (IndexEventListener listener : builtInListeners) { for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener); indexModule.addIndexEventListener(listener);
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -31,8 +30,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
@ -40,7 +37,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
@ -57,89 +53,36 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable { public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable {
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type"; // TODO this class can be foled into either IndicesService and partially into IndicesClusterStateService there is no need for a seperate public service
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
public static final String INDICES_STORE_DELETE_SHARD_TIMEOUT = "indices.store.delete.shard.timeout"; public static final String INDICES_STORE_DELETE_SHARD_TIMEOUT = "indices.store.delete.shard.timeout";
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists"; public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, IndicesStore.this.rateLimitingType);
// try and parse the type
StoreRateLimiting.Type.fromString(rateLimitingType);
if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) {
logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType);
IndicesStore.this.rateLimitingType = rateLimitingType;
IndicesStore.this.rateLimiting.setType(rateLimitingType);
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, IndicesStore.this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) {
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType);
IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle;
IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}
private final NodeSettingsService nodeSettingsService;
private final IndicesService indicesService; private final IndicesService indicesService;
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportService transportService; private final TransportService transportService;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
private TimeValue deleteShardTimeout; private TimeValue deleteShardTimeout;
@Inject @Inject
public IndicesStore(Settings settings, NodeSettingsService nodeSettingsService, IndicesService indicesService, public IndicesStore(Settings settings, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) { ClusterService clusterService, TransportService transportService) {
super(settings); super(settings);
this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler()); transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler());
// we don't limit by default (we default to CMS's auto throttle instead):
this.rateLimitingType = settings.get("indices.store.throttle.type", StoreRateLimiting.Type.NONE.name());
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB));
rateLimiting.setMaxRate(rateLimitingThrottle);
this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)); this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS));
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
nodeSettingsService.addListener(applySettings);
clusterService.addLast(this); clusterService.addLast(this);
} }
IndicesStore() { IndicesStore() {
super(Settings.EMPTY); super(Settings.EMPTY);
nodeSettingsService = null;
indicesService = null; indicesService = null;
this.clusterService = null; this.clusterService = null;
this.transportService = null; this.transportService = null;
} }
public StoreRateLimiting rateLimiting() {
return this.rateLimiting;
}
@Override @Override
public void close() { public void close() {
nodeSettingsService.removeListener(applySettings);
clusterService.remove(this); clusterService.remove(this);
} }
@ -204,7 +147,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
return true; return true;
} }
// TODO will have to ammend this for shadow replicas so we don't delete the shared copy...
private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) { private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size()); List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size());
String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getIndexUUID(); String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getIndexUUID();
@ -401,7 +343,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} }
} }
public static class ShardActiveRequest extends TransportRequest { private static class ShardActiveRequest extends TransportRequest {
protected TimeValue timeout = null; protected TimeValue timeout = null;
private ClusterName clusterName; private ClusterName clusterName;
private String indexUUID; private String indexUUID;

View File

@ -29,7 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -59,7 +59,7 @@ public class ClusterSettingsIT extends ESIntegTestCase {
} }
public void testClusterSettingsUpdateResponse() { public void testClusterSettingsUpdateResponse() {
String key1 = IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC; String key1 = IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC;
int value1 = 10; int value1 = 10;
String key2 = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE; String key2 = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE;

View File

@ -39,7 +39,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.test.engine.MockEngineFactory;
@ -240,8 +240,8 @@ public class IndexModuleTests extends ModuleTestCase {
public static final class FooStore extends IndexStore { public static final class FooStore extends IndexStore {
public FooStore(IndexSettings indexSettings, IndicesStore indicesStore) { public FooStore(IndexSettings indexSettings, IndexStoreConfig config) {
super(indexSettings, indicesStore); super(indexSettings, config);
} }
} }

View File

@ -66,17 +66,15 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.cache.IndexCacheModule; import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.MockNode; import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.node.NodeMocksPlugin;
@ -457,13 +455,13 @@ public final class InternalTestCluster extends TestCluster {
if (random.nextBoolean()) { if (random.nextBoolean()) {
if (random.nextInt(10) == 0) { // do something crazy slow here if (random.nextInt(10) == 0) { // do something crazy slow here
builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 10), ByteSizeUnit.MB)); builder.put(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 10), ByteSizeUnit.MB));
} else { } else {
builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 10, 200), ByteSizeUnit.MB)); builder.put(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 10, 200), ByteSizeUnit.MB));
} }
} }
if (random.nextBoolean()) { if (random.nextBoolean()) {
builder.put(IndicesStore.INDICES_STORE_THROTTLE_TYPE, RandomPicks.randomFrom(random, StoreRateLimiting.Type.values())); builder.put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE, RandomPicks.randomFrom(random, StoreRateLimiting.Type.values()));
} }
if (random.nextBoolean()) { if (random.nextBoolean()) {

View File

@ -29,7 +29,7 @@ import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import java.util.*; import java.util.*;
@ -64,8 +64,8 @@ public class MockFSIndexStore extends IndexStore {
} }
MockFSIndexStore(IndexSettings indexSettings, MockFSIndexStore(IndexSettings indexSettings,
IndicesStore indicesStore) { IndexStoreConfig config) {
super(indexSettings, indicesStore); super(indexSettings, config);
} }
public DirectoryService newDirectoryService(ShardPath path) { public DirectoryService newDirectoryService(ShardPath path) {

View File

@ -23,12 +23,12 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.index.store.IndexStoreConfig;
public class SmbMmapFsIndexStore extends IndexStore { public class SmbMmapFsIndexStore extends IndexStore {
public SmbMmapFsIndexStore(IndexSettings indexSettings, IndicesStore indicesStore) { public SmbMmapFsIndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
super(indexSettings, indicesStore); super(indexSettings, indexStoreConfig);
} }
@Override @Override

View File

@ -23,12 +23,12 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.index.store.IndexStoreConfig;
public class SmbSimpleFsIndexStore extends IndexStore { public class SmbSimpleFsIndexStore extends IndexStore {
public SmbSimpleFsIndexStore(IndexSettings indexSettings, IndicesStore indicesStore) { public SmbSimpleFsIndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
super(indexSettings, indicesStore); super(indexSettings, indexStoreConfig);
} }
@Override @Override