Remove circular dependency between IndicesService and IndicesStore

This commit introduces a new IndexStoreConfig that is passed to
IndexStore instances instead it's pretty messy parent service.
This commit is contained in:
Simon Willnauer 2015-10-26 14:56:43 +01:00
parent fcc2c19600
commit cc3532f593
10 changed files with 129 additions and 103 deletions

View File

@ -36,7 +36,7 @@ public class StoreRateLimiting {
void onPause(long nanos);
}
public static enum Type {
public enum Type {
NONE,
MERGE,
ALL;

View File

@ -32,7 +32,7 @@ import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import java.util.*;
import java.util.function.BiFunction;
@ -55,19 +55,19 @@ public class IndexModule extends AbstractModule {
public static final String STORE_TYPE = "index.store.type";
public static final String SIMILARITY_SETTINGS_PREFIX = "index.similarity";
private final IndexSettings indexSettings;
private final IndicesStore indicesStore;
private final IndexStoreConfig indexStoreConfig;
// pkg private so tests can mock
Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
Class<? extends IndexSearcherWrapper> indexSearcherWrapper = null;
private final Set<Consumer<Settings>> settingsConsumers = new HashSet<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private IndexEventListener listener;
private final Map<String, BiFunction<IndexSettings, IndicesStore, IndexStore>> storeTypes = new HashMap<>();
private final Map<String, BiFunction<String, Settings, SimilarityProvider>> similarities = new HashMap<>();
private final Map<String, BiFunction<IndexSettings, IndexStoreConfig, IndexStore>> storeTypes = new HashMap<>();
public IndexModule(IndexSettings indexSettings, IndicesStore indicesStore) {
this.indicesStore = indicesStore;
public IndexModule(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
this.indexStoreConfig = indexStoreConfig;
this.indexSettings = indexSettings;
}
@ -134,7 +134,7 @@ public class IndexModule extends AbstractModule {
* @param type the type to register
* @param provider the instance provider / factory method
*/
public void addIndexStore(String type, BiFunction<IndexSettings, IndicesStore, IndexStore> provider) {
public void addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider) {
if (storeTypes.containsKey(type)) {
throw new IllegalArgumentException("key [" + type +"] already registerd");
}
@ -192,13 +192,13 @@ public class IndexModule extends AbstractModule {
final String storeType = settings.getSettings().get(STORE_TYPE);
final IndexStore store;
if (storeType == null || isBuiltinType(storeType)) {
store = new IndexStore(settings, indicesStore);
store = new IndexStore(settings, indexStoreConfig);
} else {
BiFunction<IndexSettings, IndicesStore, IndexStore> factory = storeTypes.get(storeType);
BiFunction<IndexSettings, IndexStoreConfig, IndexStore> factory = storeTypes.get(storeType);
if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
}
store = factory.apply(settings, indicesStore);
store = factory.apply(settings, indexStoreConfig);
if (store == null) {
throw new IllegalStateException("store must not be null");
}

View File

@ -25,8 +25,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.store.IndicesStore;
/**
*
*/
@ -35,16 +33,16 @@ public class IndexStore extends AbstractIndexComponent {
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
protected final IndicesStore indicesStore;
protected final IndexStoreConfig indexStoreConfig;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private volatile boolean nodeRateLimiting;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
public IndexStore(IndexSettings indexSettings, IndicesStore indicesStore) {
public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
super(indexSettings);
this.indicesStore = indicesStore;
this.indexStoreConfig = indexStoreConfig;
this.rateLimitingType = indexSettings.getSettings().get(INDEX_STORE_THROTTLE_TYPE, "none");
if (rateLimitingType.equalsIgnoreCase("node")) {
@ -64,7 +62,7 @@ public class IndexStore extends AbstractIndexComponent {
* the node level one (defaults to the node level one).
*/
public StoreRateLimiting rateLimiting() {
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting;
return nodeRateLimiting ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
}
/**

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
/**
* IndexStoreConfig encapsulates node / cluster level configuration for index level {@link IndexStore} instances.
* For instance does it maintain the node level rate-limiter if configured. Updates to the cluster that disable or enable
* {@value #INDICES_STORE_THROTTLE_TYPE} or {@value #INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC} are reflected immediately
* on all referencing {@link IndexStore} instances
*/
public class IndexStoreConfig implements NodeSettingsService.Listener{
private static final ByteSizeValue DEFAULT_THROTTLE = new ByteSizeValue(10240, ByteSizeUnit.MB);
/**
* Configures the node / cluster level throttle type. See {@link StoreRateLimiting.Type}.
*/
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
/**
* Configures the node / cluster level throttle intensity. The default is <tt>10240 MB</tt>
*/
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ESLogger logger;
public IndexStoreConfig(Settings settings) {
logger = Loggers.getLogger(IndexStoreConfig.class, settings);
// we don't limit by default (we default to CMS's auto throttle instead):
this.rateLimitingType = settings.get("indices.store.throttle.type", StoreRateLimiting.Type.NONE.name());
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", DEFAULT_THROTTLE);
rateLimiting.setMaxRate(rateLimitingThrottle);
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
}
/**
* Returns the node level rate limiter
*/
public StoreRateLimiting getNodeRateLimiter(){
return rateLimiting;
}
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, this.rateLimitingType);
// try and parse the type
StoreRateLimiting.Type.fromString(rateLimitingType);
if (!rateLimitingType.equals(this.rateLimitingType)) {
logger.info("updating indices.store.throttle.type from [{}] to [{}]", this.rateLimitingType, rateLimitingType);
this.rateLimitingType = rateLimitingType;
this.rateLimiting.setType(rateLimitingType);
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(this.rateLimitingThrottle)) {
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", this.rateLimitingThrottle, rateLimitingThrottle, this.rateLimitingType);
this.rateLimitingThrottle = rateLimitingThrottle;
this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}

View File

@ -59,9 +59,10 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.plugins.PluginsService;
import java.io.Closeable;
@ -117,15 +118,18 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final OldShardsStats oldShardsStats = new OldShardsStats();
private final IndexStoreConfig indexStoreConfig;
@Inject
public IndicesService(Settings settings, IndicesAnalysisService indicesAnalysisService, Injector injector, PluginsService pluginsService, NodeEnvironment nodeEnv) {
public IndicesService(Settings settings, IndicesAnalysisService indicesAnalysisService, Injector injector, PluginsService pluginsService, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService) {
super(settings);
this.indicesAnalysisService = indicesAnalysisService;
this.injector = injector;
this.pluginsService = pluginsService;
this.nodeEnv = nodeEnv;
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
this.indexStoreConfig = new IndexStoreConfig(settings);
nodeSettingsService.addListener(indexStoreConfig);
}
@Override
@ -140,16 +144,13 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
Set<String> indices = new HashSet<>(this.indices.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size());
for (final String index : indices) {
indicesStopExecutor.execute(new Runnable() {
@Override
public void run() {
try {
removeIndex(index, "shutdown", false);
} catch (Throwable e) {
logger.warn("failed to remove index on stop [" + index + "]", e);
} finally {
latch.countDown();
}
indicesStopExecutor.execute(() -> {
try {
removeIndex(index, "shutdown", false);
} catch (Throwable e) {
logger.warn("failed to remove index on stop [" + index + "]", e);
} finally {
latch.countDown();
}
});
}
@ -288,7 +289,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
}
final IndicesStore indicesStore = injector.getInstance(IndicesStore.class); // TODO remove this circular dep!!
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, Collections.EMPTY_LIST);
Index index = new Index(indexMetaData.getIndex());
if (indices.containsKey(index.name())) {
@ -306,7 +306,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
for (Module pluginModule : pluginsService.indexModules(idxSettings.getSettings())) {
modules.add(pluginModule);
}
final IndexModule indexModule = new IndexModule(idxSettings, indicesStore);
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -31,8 +30,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexService;
@ -40,7 +37,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -57,89 +53,36 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable {
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
// TODO this class can be foled into either IndicesService and partially into IndicesClusterStateService there is no need for a seperate public service
public static final String INDICES_STORE_DELETE_SHARD_TIMEOUT = "indices.store.delete.shard.timeout";
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, IndicesStore.this.rateLimitingType);
// try and parse the type
StoreRateLimiting.Type.fromString(rateLimitingType);
if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) {
logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType);
IndicesStore.this.rateLimitingType = rateLimitingType;
IndicesStore.this.rateLimiting.setType(rateLimitingType);
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, IndicesStore.this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) {
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType);
IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle;
IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}
private final NodeSettingsService nodeSettingsService;
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
private TimeValue deleteShardTimeout;
@Inject
public IndicesStore(Settings settings, NodeSettingsService nodeSettingsService, IndicesService indicesService,
public IndicesStore(Settings settings, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) {
super(settings);
this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler());
// we don't limit by default (we default to CMS's auto throttle instead):
this.rateLimitingType = settings.get("indices.store.throttle.type", StoreRateLimiting.Type.NONE.name());
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB));
rateLimiting.setMaxRate(rateLimitingThrottle);
this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS));
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
nodeSettingsService.addListener(applySettings);
clusterService.addLast(this);
}
IndicesStore() {
super(Settings.EMPTY);
nodeSettingsService = null;
indicesService = null;
this.clusterService = null;
this.transportService = null;
}
public StoreRateLimiting rateLimiting() {
return this.rateLimiting;
}
@Override
public void close() {
nodeSettingsService.removeListener(applySettings);
clusterService.remove(this);
}
@ -204,7 +147,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
return true;
}
// TODO will have to ammend this for shadow replicas so we don't delete the shared copy...
private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size());
String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getIndexUUID();
@ -401,7 +343,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
}
public static class ShardActiveRequest extends TransportRequest {
private static class ShardActiveRequest extends TransportRequest {
protected TimeValue timeout = null;
private ClusterName clusterName;
private String indexUUID;

View File

@ -39,7 +39,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.engine.MockEngineFactory;
@ -240,8 +240,8 @@ public class IndexModuleTests extends ModuleTestCase {
public static final class FooStore extends IndexStore {
public FooStore(IndexSettings indexSettings, IndicesStore indicesStore) {
super(indexSettings, indicesStore);
public FooStore(IndexSettings indexSettings, IndexStoreConfig config) {
super(indexSettings, config);
}
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.plugins.Plugin;
import java.util.*;
@ -64,8 +64,8 @@ public class MockFSIndexStore extends IndexStore {
}
MockFSIndexStore(IndexSettings indexSettings,
IndicesStore indicesStore) {
super(indexSettings, indicesStore);
IndexStoreConfig config) {
super(indexSettings, config);
}
public DirectoryService newDirectoryService(ShardPath path) {

View File

@ -23,12 +23,12 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.index.store.IndexStoreConfig;
public class SmbMmapFsIndexStore extends IndexStore {
public SmbMmapFsIndexStore(IndexSettings indexSettings, IndicesStore indicesStore) {
super(indexSettings, indicesStore);
public SmbMmapFsIndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
super(indexSettings, indexStoreConfig);
}
@Override

View File

@ -23,12 +23,12 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.index.store.IndexStoreConfig;
public class SmbSimpleFsIndexStore extends IndexStore {
public SmbSimpleFsIndexStore(IndexSettings indexSettings, IndicesStore indicesStore) {
super(indexSettings, indicesStore);
public SmbSimpleFsIndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
super(indexSettings, indexStoreConfig);
}
@Override