Remove store throttling. (#21573)
Store throttling has been disabled by default since Lucene added automatic throttling of merge operations based on the indexing rate.
This commit is contained in:
parent
d3b444ad8a
commit
6581b77198
|
@ -1,106 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import org.apache.lucene.store.IOContext.Context;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public final class RateLimitedFSDirectory extends FilterDirectory {
|
||||
|
||||
private final StoreRateLimiting.Provider rateLimitingProvider;
|
||||
|
||||
private final StoreRateLimiting.Listener rateListener;
|
||||
|
||||
public RateLimitedFSDirectory(Directory wrapped, StoreRateLimiting.Provider rateLimitingProvider,
|
||||
StoreRateLimiting.Listener rateListener) {
|
||||
super(wrapped);
|
||||
this.rateLimitingProvider = rateLimitingProvider;
|
||||
this.rateListener = rateListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
final IndexOutput output = in.createOutput(name, context);
|
||||
|
||||
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
|
||||
StoreRateLimiting.Type type = rateLimiting.getType();
|
||||
RateLimiter limiter = rateLimiting.getRateLimiter();
|
||||
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
|
||||
return output;
|
||||
}
|
||||
if (context.context == Context.MERGE || type == StoreRateLimiting.Type.ALL) {
|
||||
// we are merging, and type is either MERGE or ALL, rate limit...
|
||||
return new RateLimitedIndexOutput(new RateLimiterWrapper(limiter, rateListener), output);
|
||||
}
|
||||
// we shouldn't really get here...
|
||||
return output;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
|
||||
StoreRateLimiting.Type type = rateLimiting.getType();
|
||||
RateLimiter limiter = rateLimiting.getRateLimiter();
|
||||
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
|
||||
return StoreUtils.toString(in);
|
||||
} else {
|
||||
return "rate_limited(" + StoreUtils.toString(in) + ", type=" + type.name() + ", rate=" + limiter.getMBPerSec() + ")";
|
||||
}
|
||||
}
|
||||
|
||||
// we wrap the limiter to notify our store if we limited to get statistics
|
||||
static final class RateLimiterWrapper extends RateLimiter {
|
||||
private final RateLimiter delegate;
|
||||
private final StoreRateLimiting.Listener rateListener;
|
||||
|
||||
RateLimiterWrapper(RateLimiter delegate, StoreRateLimiting.Listener rateListener) {
|
||||
this.delegate = delegate;
|
||||
this.rateListener = rateListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMBPerSec(double mbPerSec) {
|
||||
delegate.setMBPerSec(mbPerSec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getMBPerSec() {
|
||||
return delegate.getMBPerSec();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pause(long bytes) throws IOException {
|
||||
long pause = delegate.pause(bytes);
|
||||
rateListener.onPause(pause);
|
||||
return pause;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinPauseCheckBytes() {
|
||||
return delegate.getMinPauseCheckBytes();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,91 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
public class StoreRateLimiting {
|
||||
|
||||
public interface Provider {
|
||||
|
||||
StoreRateLimiting rateLimiting();
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
|
||||
void onPause(long nanos);
|
||||
}
|
||||
|
||||
public enum Type {
|
||||
NONE,
|
||||
MERGE,
|
||||
ALL;
|
||||
|
||||
public static Type fromString(String type) {
|
||||
if ("none".equalsIgnoreCase(type)) {
|
||||
return NONE;
|
||||
} else if ("merge".equalsIgnoreCase(type)) {
|
||||
return MERGE;
|
||||
} else if ("all".equalsIgnoreCase(type)) {
|
||||
return ALL;
|
||||
}
|
||||
throw new IllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none]");
|
||||
}
|
||||
}
|
||||
|
||||
private final SimpleRateLimiter rateLimiter = new SimpleRateLimiter(0);
|
||||
private volatile SimpleRateLimiter actualRateLimiter;
|
||||
|
||||
private volatile Type type;
|
||||
|
||||
public StoreRateLimiting() {
|
||||
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public RateLimiter getRateLimiter() {
|
||||
return actualRateLimiter;
|
||||
}
|
||||
|
||||
public void setMaxRate(ByteSizeValue rate) {
|
||||
if (rate.getBytes() <= 0) {
|
||||
actualRateLimiter = null;
|
||||
} else if (actualRateLimiter == null) {
|
||||
actualRateLimiter = rateLimiter;
|
||||
actualRateLimiter.setMBPerSec(rate.getMbFrac());
|
||||
} else {
|
||||
assert rateLimiter == actualRateLimiter;
|
||||
rateLimiter.setMBPerSec(rate.getMbFrac());
|
||||
}
|
||||
}
|
||||
|
||||
public Type getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = Type.fromString(type);
|
||||
}
|
||||
}
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public final class StoreUtils {
|
||||
|
||||
private StoreUtils() {
|
||||
|
||||
}
|
||||
|
||||
public static String toString(Directory directory) {
|
||||
if (directory instanceof NIOFSDirectory) {
|
||||
NIOFSDirectory niofsDirectory = (NIOFSDirectory)directory;
|
||||
return "niofs(" + niofsDirectory.getDirectory() + ")";
|
||||
}
|
||||
if (directory instanceof MMapDirectory) {
|
||||
MMapDirectory mMapDirectory = (MMapDirectory)directory;
|
||||
return "mmapfs(" + mMapDirectory.getDirectory() + ")";
|
||||
}
|
||||
if (directory instanceof SimpleFSDirectory) {
|
||||
SimpleFSDirectory simpleFSDirectory = (SimpleFSDirectory)directory;
|
||||
return "simplefs(" + simpleFSDirectory.getDirectory() + ")";
|
||||
}
|
||||
if (directory instanceof FileSwitchDirectory) {
|
||||
FileSwitchDirectory fileSwitchDirectory = (FileSwitchDirectory) directory;
|
||||
return "default(" + toString(fileSwitchDirectory.getPrimaryDir()) + "," + toString(fileSwitchDirectory.getSecondaryDir()) + ")";
|
||||
}
|
||||
|
||||
return directory.toString();
|
||||
}
|
||||
|
||||
public static String toString(Directory[] directories) {
|
||||
String[] strings = new String[directories.length];
|
||||
for(int i=0;i<directories.length;i++) {
|
||||
strings[i] = toString(directories[i]);
|
||||
}
|
||||
return Arrays.toString(strings);
|
||||
}
|
||||
}
|
|
@ -54,7 +54,6 @@ import org.elasticsearch.discovery.DiscoverySettings;
|
|||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
import org.elasticsearch.ingest.IngestMetadata;
|
||||
|
@ -757,7 +756,6 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
|
|||
|
||||
/** All known byte-sized cluster settings. */
|
||||
public static final Set<String> CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet(
|
||||
IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(),
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()));
|
||||
|
||||
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDeci
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
@ -64,7 +63,6 @@ import org.elasticsearch.gateway.GatewayService;
|
|||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
import org.elasticsearch.http.HttpTransportSettings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
|
@ -183,8 +181,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
FsRepository.REPOSITORIES_CHUNK_SIZE_SETTING,
|
||||
FsRepository.REPOSITORIES_COMPRESS_SETTING,
|
||||
FsRepository.REPOSITORIES_LOCATION_SETTING,
|
||||
IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING,
|
||||
IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
|
||||
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
|
||||
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
|
||||
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.index.mapper.MapperService;
|
|||
import org.elasticsearch.index.seqno.LocalCheckpointService;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.FsDirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
|
||||
|
@ -61,8 +60,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
public static final Set<Setting<?>> BUILT_IN_INDEX_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY,
|
||||
IndexSettings.INDEX_TTL_DISABLE_PURGE_SETTING,
|
||||
IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING,
|
||||
IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
|
||||
MergeSchedulerConfig.AUTO_THROTTLE_SETTING,
|
||||
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
|
||||
MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING,
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.index.similarity.BM25SimilarityProvider;
|
|||
import org.elasticsearch.index.similarity.SimilarityProvider;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
|
@ -73,7 +72,7 @@ import java.util.function.Function;
|
|||
* {@link #addSimilarity(String, BiFunction)}while existing Providers can be referenced through Settings under the
|
||||
* {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix along with the "type" value. For example, to reference the
|
||||
* {@link BM25SimilarityProvider}, the configuration <tt>"index.similarity.my_similarity.type : "BM25"</tt> can be used.</li>
|
||||
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, BiFunction)}</li>
|
||||
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, Function)}</li>
|
||||
* <li>{@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via
|
||||
* {@link #addIndexEventListener(IndexEventListener)}</li>
|
||||
* <li>Settings update listener - Custom settings update listener can be registered via
|
||||
|
@ -108,21 +107,19 @@ public final class IndexModule {
|
|||
Setting.boolSetting("index.queries.cache.term_queries", false, Property.IndexScope);
|
||||
|
||||
private final IndexSettings indexSettings;
|
||||
private final IndexStoreConfig indexStoreConfig;
|
||||
private final AnalysisRegistry analysisRegistry;
|
||||
// pkg private so tests can mock
|
||||
final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
|
||||
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
|
||||
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
|
||||
private final Map<String, BiFunction<String, Settings, SimilarityProvider>> similarities = new HashMap<>();
|
||||
private final Map<String, BiFunction<IndexSettings, IndexStoreConfig, IndexStore>> storeTypes = new HashMap<>();
|
||||
private final Map<String, Function<IndexSettings, IndexStore>> storeTypes = new HashMap<>();
|
||||
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
|
||||
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
|
||||
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
|
||||
private final AtomicBoolean frozen = new AtomicBoolean(false);
|
||||
|
||||
public IndexModule(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig, AnalysisRegistry analysisRegistry) {
|
||||
this.indexStoreConfig = indexStoreConfig;
|
||||
public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) {
|
||||
this.indexSettings = indexSettings;
|
||||
this.analysisRegistry = analysisRegistry;
|
||||
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
|
||||
|
@ -244,7 +241,7 @@ public final class IndexModule {
|
|||
* @param type the type to register
|
||||
* @param provider the instance provider / factory method
|
||||
*/
|
||||
public void addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider) {
|
||||
public void addIndexStore(String type, Function<IndexSettings, IndexStore> provider) {
|
||||
ensureNotFrozen();
|
||||
if (storeTypes.containsKey(type)) {
|
||||
throw new IllegalArgumentException("key [" + type +"] already registered");
|
||||
|
@ -345,20 +342,17 @@ public final class IndexModule {
|
|||
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
|
||||
final IndexStore store;
|
||||
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) {
|
||||
store = new IndexStore(indexSettings, indexStoreConfig);
|
||||
store = new IndexStore(indexSettings);
|
||||
} else {
|
||||
BiFunction<IndexSettings, IndexStoreConfig, IndexStore> factory = storeTypes.get(storeType);
|
||||
Function<IndexSettings, IndexStore> factory = storeTypes.get(storeType);
|
||||
if (factory == null) {
|
||||
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
|
||||
}
|
||||
store = factory.apply(indexSettings, indexStoreConfig);
|
||||
store = factory.apply(indexSettings);
|
||||
if (store == null) {
|
||||
throw new IllegalStateException("store must not be null");
|
||||
}
|
||||
}
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
|
||||
store::setMaxRate);
|
||||
final QueryCache queryCache;
|
||||
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
|
||||
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
|
||||
|
|
|
@ -32,7 +32,5 @@ public abstract class DirectoryService extends AbstractIndexShardComponent {
|
|||
super(shardId, indexSettings);
|
||||
}
|
||||
|
||||
public abstract long throttleTimeInNanos();
|
||||
|
||||
public abstract Directory newDirectory() throws IOException;
|
||||
}
|
|
@ -26,14 +26,11 @@ import org.apache.lucene.store.LockFactory;
|
|||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.NIOFSDirectory;
|
||||
import org.apache.lucene.store.NativeFSLockFactory;
|
||||
import org.apache.lucene.store.RateLimitedFSDirectory;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.store.SimpleFSLockFactory;
|
||||
import org.apache.lucene.store.SleepingLockWrapper;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
|
@ -46,7 +43,7 @@ import java.nio.file.Path;
|
|||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class FsDirectoryService extends DirectoryService implements StoreRateLimiting.Listener, StoreRateLimiting.Provider {
|
||||
public class FsDirectoryService extends DirectoryService {
|
||||
|
||||
protected final IndexStore indexStore;
|
||||
public static final Setting<LockFactory> INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> {
|
||||
|
@ -60,7 +57,6 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
|
|||
} // can we set on both - node and index level, some nodes might be running on NFS so they might need simple rather than native
|
||||
}, Property.IndexScope, Property.NodeScope);
|
||||
|
||||
private final CounterMetric rateLimitingTimeInNanos = new CounterMetric();
|
||||
private final ShardPath path;
|
||||
|
||||
@Inject
|
||||
|
@ -70,16 +66,6 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
|
|||
this.indexStore = indexStore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return rateLimitingTimeInNanos.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return indexStore.rateLimiting();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
final Path location = path.resolveIndex();
|
||||
|
@ -92,12 +78,7 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
|
|||
if (IndexMetaData.isOnSharedFilesystem(indexSettings.getSettings())) {
|
||||
wrapped = new SleepingLockWrapper(wrapped, 5000);
|
||||
}
|
||||
return new RateLimitedFSDirectory(wrapped, this, this) ;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPause(long nanos) {
|
||||
rateLimitingTimeInNanos.inc(nanos);
|
||||
return wrapped;
|
||||
}
|
||||
|
||||
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
|
||||
|
|
|
@ -19,40 +19,14 @@
|
|||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
|
||||
public class IndexStore extends AbstractIndexComponent {
|
||||
public static final Setting<IndexRateLimitingType> INDEX_STORE_THROTTLE_TYPE_SETTING =
|
||||
new Setting<>("index.store.throttle.type", "none", IndexRateLimitingType::fromString,
|
||||
Property.Dynamic, Property.IndexScope);
|
||||
public static final Setting<ByteSizeValue> INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING =
|
||||
Setting.byteSizeSetting("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0),
|
||||
Property.Dynamic, Property.IndexScope);
|
||||
|
||||
protected final IndexStoreConfig indexStoreConfig;
|
||||
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
|
||||
private volatile IndexRateLimitingType type;
|
||||
|
||||
public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
|
||||
public IndexStore(IndexSettings indexSettings) {
|
||||
super(indexSettings);
|
||||
this.indexStoreConfig = indexStoreConfig;
|
||||
setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING));
|
||||
rateLimiting.setMaxRate(indexSettings.getValue(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING));
|
||||
logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimiting.getType(), rateLimiting.getRateLimiter());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the rate limiting, either of the index is explicitly configured, or
|
||||
* the node level one (defaults to the node level one).
|
||||
*/
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return type.useStoreLimiter() ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -62,44 +36,4 @@ public class IndexStore extends AbstractIndexComponent {
|
|||
return new FsDirectoryService(indexSettings, this, path);
|
||||
}
|
||||
|
||||
public void setType(IndexRateLimitingType type) {
|
||||
this.type = type;
|
||||
if (type.useStoreLimiter() == false) {
|
||||
rateLimiting.setType(type.type);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxRate(ByteSizeValue rate) {
|
||||
rateLimiting.setMaxRate(rate);
|
||||
}
|
||||
|
||||
/**
|
||||
* On an index level we can configure all of {@link org.apache.lucene.store.StoreRateLimiting.Type} as well as
|
||||
* <tt>node</tt> which will then use a global rate limiter that has it's own configuration. The global one is
|
||||
* configured in {@link IndexStoreConfig} which is managed by the per-node {@link org.elasticsearch.indices.IndicesService}
|
||||
*/
|
||||
public static final class IndexRateLimitingType {
|
||||
private final StoreRateLimiting.Type type;
|
||||
|
||||
private IndexRateLimitingType(StoreRateLimiting.Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
private boolean useStoreLimiter() {
|
||||
return type == null;
|
||||
}
|
||||
|
||||
static IndexRateLimitingType fromString(String type) {
|
||||
if ("node".equalsIgnoreCase(type)) {
|
||||
return new IndexRateLimitingType(null);
|
||||
} else {
|
||||
try {
|
||||
return new IndexRateLimitingType(StoreRateLimiting.Type.fromString(type));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new IllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none|node]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
/**
|
||||
* IndexStoreConfig encapsulates node / cluster level configuration for index level {@link IndexStore} instances.
|
||||
* For instance it maintains the node level rate limiter configuration: updates to the cluster that disable or enable
|
||||
* <tt>indices.store.throttle.type</tt> or <tt>indices.store.throttle.max_bytes_per_sec</tt> are reflected immediately
|
||||
* on all referencing {@link IndexStore} instances
|
||||
*/
|
||||
public class IndexStoreConfig {
|
||||
|
||||
/**
|
||||
* Configures the node / cluster level throttle type. See {@link StoreRateLimiting.Type}.
|
||||
*/
|
||||
public static final Setting<StoreRateLimiting.Type> INDICES_STORE_THROTTLE_TYPE_SETTING =
|
||||
new Setting<>("indices.store.throttle.type", StoreRateLimiting.Type.NONE.name(),StoreRateLimiting.Type::fromString,
|
||||
Property.Dynamic, Property.NodeScope);
|
||||
/**
|
||||
* Configures the node / cluster level throttle intensity. The default is <tt>10240 MB</tt>
|
||||
*/
|
||||
public static final Setting<ByteSizeValue> INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING =
|
||||
Setting.byteSizeSetting("indices.store.throttle.max_bytes_per_sec", new ByteSizeValue(0),
|
||||
Property.Dynamic, Property.NodeScope);
|
||||
private volatile StoreRateLimiting.Type rateLimitingType;
|
||||
private volatile ByteSizeValue rateLimitingThrottle;
|
||||
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
|
||||
private final Logger logger;
|
||||
public IndexStoreConfig(Settings settings) {
|
||||
logger = Loggers.getLogger(IndexStoreConfig.class, settings);
|
||||
// we don't limit by default (we default to CMS's auto throttle instead):
|
||||
this.rateLimitingType = INDICES_STORE_THROTTLE_TYPE_SETTING.get(settings);
|
||||
rateLimiting.setType(rateLimitingType);
|
||||
this.rateLimitingThrottle = INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.get(settings);
|
||||
rateLimiting.setMaxRate(rateLimitingThrottle);
|
||||
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the node level rate limiter
|
||||
*/
|
||||
public StoreRateLimiting getNodeRateLimiter(){
|
||||
return rateLimiting;
|
||||
}
|
||||
|
||||
public void setRateLimitingType(StoreRateLimiting.Type rateLimitingType) {
|
||||
this.rateLimitingType = rateLimitingType;
|
||||
rateLimiting.setType(rateLimitingType);
|
||||
}
|
||||
|
||||
public void setRateLimitingThrottle(ByteSizeValue rateLimitingThrottle) {
|
||||
this.rateLimitingThrottle = rateLimitingThrottle;
|
||||
}
|
||||
}
|
|
@ -159,7 +159,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
this.shardLock = shardLock;
|
||||
this.onClose = onClose;
|
||||
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
|
||||
this.statsCache = new StoreStatsCache(refreshInterval, directory, directoryService);
|
||||
this.statsCache = new StoreStatsCache(refreshInterval, directory);
|
||||
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
|
||||
|
||||
assert onClose != null;
|
||||
|
@ -1351,18 +1351,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
|
||||
private static class StoreStatsCache extends SingleObjectCache<StoreStats> {
|
||||
private final Directory directory;
|
||||
private final DirectoryService directoryService;
|
||||
|
||||
public StoreStatsCache(TimeValue refreshInterval, Directory directory, DirectoryService directoryService) throws IOException {
|
||||
super(refreshInterval, new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos()));
|
||||
public StoreStatsCache(TimeValue refreshInterval, Directory directory) throws IOException {
|
||||
super(refreshInterval, new StoreStats(estimateSize(directory)));
|
||||
this.directory = directory;
|
||||
this.directoryService = directoryService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StoreStats refresh() {
|
||||
try {
|
||||
return new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos());
|
||||
return new StoreStats(estimateSize(directory));
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("failed to refresh store stats", ex);
|
||||
}
|
||||
|
|
|
@ -19,11 +19,11 @@
|
|||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
|
@ -33,15 +33,12 @@ public class StoreStats implements Streamable, ToXContent {
|
|||
|
||||
private long sizeInBytes;
|
||||
|
||||
private long throttleTimeInNanos;
|
||||
|
||||
public StoreStats() {
|
||||
|
||||
}
|
||||
|
||||
public StoreStats(long sizeInBytes, long throttleTimeInNanos) {
|
||||
public StoreStats(long sizeInBytes) {
|
||||
this.sizeInBytes = sizeInBytes;
|
||||
this.throttleTimeInNanos = throttleTimeInNanos;
|
||||
}
|
||||
|
||||
public void add(StoreStats stats) {
|
||||
|
@ -49,7 +46,6 @@ public class StoreStats implements Streamable, ToXContent {
|
|||
return;
|
||||
}
|
||||
sizeInBytes += stats.sizeInBytes;
|
||||
throttleTimeInNanos += stats.throttleTimeInNanos;
|
||||
}
|
||||
|
||||
|
||||
|
@ -69,14 +65,6 @@ public class StoreStats implements Streamable, ToXContent {
|
|||
return size();
|
||||
}
|
||||
|
||||
public TimeValue throttleTime() {
|
||||
return TimeValue.timeValueNanos(throttleTimeInNanos);
|
||||
}
|
||||
|
||||
public TimeValue getThrottleTime() {
|
||||
return throttleTime();
|
||||
}
|
||||
|
||||
public static StoreStats readStoreStats(StreamInput in) throws IOException {
|
||||
StoreStats store = new StoreStats();
|
||||
store.readFrom(in);
|
||||
|
@ -86,20 +74,23 @@ public class StoreStats implements Streamable, ToXContent {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
sizeInBytes = in.readVLong();
|
||||
throttleTimeInNanos = in.readVLong();
|
||||
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
|
||||
in.readVLong(); // throttleTimeInNanos
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(sizeInBytes);
|
||||
out.writeVLong(throttleTimeInNanos);
|
||||
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
|
||||
out.writeVLong(0L); // throttleTimeInNanos
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.STORE);
|
||||
builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, sizeInBytes);
|
||||
builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, throttleTime());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -108,7 +99,5 @@ public class StoreStats implements Streamable, ToXContent {
|
|||
static final String STORE = "store";
|
||||
static final String SIZE = "size";
|
||||
static final String SIZE_IN_BYTES = "size_in_bytes";
|
||||
static final String THROTTLE_TIME = "throttle_time";
|
||||
static final String THROTTLE_TIME_IN_MILLIS = "throttle_time_in_millis";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,7 +98,6 @@ import org.elasticsearch.index.shard.IndexShardState;
|
|||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||
import org.elasticsearch.index.shard.IndexingStats;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
|
@ -169,7 +168,6 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
|
||||
private final AtomicInteger numUncompletedDeletes = new AtomicInteger();
|
||||
private final OldShardsStats oldShardsStats = new OldShardsStats();
|
||||
private final IndexStoreConfig indexStoreConfig;
|
||||
private final MapperRegistry mapperRegistry;
|
||||
private final NamedWriteableRegistry namedWriteableRegistry;
|
||||
private final IndexingMemoryController indexingMemoryController;
|
||||
|
@ -196,7 +194,6 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
this.pluginsService = pluginsService;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
|
||||
this.indexStoreConfig = new IndexStoreConfig(settings);
|
||||
this.analysisRegistry = analysisRegistry;
|
||||
this.indicesQueriesRegistry = indicesQueriesRegistry;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
|
@ -204,8 +201,6 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
this.indicesQueryCache = new IndicesQueryCache(settings);
|
||||
this.mapperRegistry = mapperRegistry;
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
|
||||
indexingMemoryController = new IndexingMemoryController(settings, threadPool,
|
||||
// ensure we pull an iter with new shards - flatten makes a copy
|
||||
() -> Iterables.flatten(this).iterator());
|
||||
|
@ -439,7 +434,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
idxSettings.getNumberOfReplicas(),
|
||||
idxSettings.isShadowReplicaIndex() ? "s" : "", reason);
|
||||
|
||||
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
|
||||
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
|
||||
for (IndexingOperationListener operationListener : indexingOperationListeners) {
|
||||
indexModule.addIndexOperationListener(operationListener);
|
||||
}
|
||||
|
@ -473,7 +468,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
final Index index = indexMetaData.getIndex();
|
||||
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
|
||||
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
|
||||
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
|
||||
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
|
||||
pluginsService.onIndexModule(indexModule);
|
||||
return indexModule.newIndexMapperService(mapperRegistry);
|
||||
}
|
||||
|
|
|
@ -106,13 +106,13 @@ public class DiskUsageTests extends ESTestCase {
|
|||
test_0 = ShardRoutingHelper.moveToStarted(test_0);
|
||||
Path test0Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("0");
|
||||
CommonStats commonStats0 = new CommonStats();
|
||||
commonStats0.store = new StoreStats(100, 1);
|
||||
commonStats0.store = new StoreStats(100);
|
||||
ShardRouting test_1 = ShardRouting.newUnassigned(new ShardId(index, 1), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
test_1 = ShardRoutingHelper.initialize(test_1, "node2");
|
||||
test_1 = ShardRoutingHelper.moveToStarted(test_1);
|
||||
Path test1Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("1");
|
||||
CommonStats commonStats1 = new CommonStats();
|
||||
commonStats1.store = new StoreStats(1000, 1);
|
||||
commonStats1.store = new StoreStats(1000);
|
||||
ShardStats[] stats = new ShardStats[] {
|
||||
new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null),
|
||||
new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null)
|
||||
|
@ -141,14 +141,14 @@ public class DiskUsageTests extends ESTestCase {
|
|||
s0 = ShardRoutingHelper.moveToStarted(s0);
|
||||
Path i0Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("0");
|
||||
CommonStats commonStats0 = new CommonStats();
|
||||
commonStats0.store = new StoreStats(100, 1);
|
||||
commonStats0.store = new StoreStats(100);
|
||||
final Index index2 = new Index("shadow", "0xcafe0001");
|
||||
ShardRouting s1 = ShardRouting.newUnassigned(new ShardId(index2, 0), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
s1 = ShardRoutingHelper.initialize(s1, "node2");
|
||||
s1 = ShardRoutingHelper.moveToStarted(s1);
|
||||
Path i1Path = createTempDir().resolve("indices").resolve(index2.getUUID()).resolve("0");
|
||||
CommonStats commonStats1 = new CommonStats();
|
||||
commonStats1.store = new StoreStats(1000, 1);
|
||||
commonStats1.store = new StoreStats(1000);
|
||||
ShardStats[] stats = new ShardStats[] {
|
||||
new ShardStats(s0, new ShardPath(false, i0Path, i0Path, s0.shardId()), commonStats0 , null, null),
|
||||
new ShardStats(s1, new ShardPath(false, i1Path, i1Path, s1.shardId()), commonStats1 , null, null)
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
||||
|
@ -177,7 +177,7 @@ public class ClusterSettingsIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testClusterSettingsUpdateResponse() {
|
||||
String key1 = IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey();
|
||||
String key1 = RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey();
|
||||
int value1 = 10;
|
||||
|
||||
String key2 = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey();
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.similarity.SimilarityProvider;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
|
@ -150,7 +149,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testWrapperIsBound() throws IOException {
|
||||
IndexModule module = new IndexModule(indexSettings, null,
|
||||
IndexModule module = new IndexModule(indexSettings,
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.setSearcherWrapper((s) -> new Wrapper());
|
||||
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
|
||||
|
@ -170,7 +169,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
|
||||
.build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
|
||||
IndexModule module = new IndexModule(indexSettings, null,
|
||||
IndexModule module = new IndexModule(indexSettings,
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.addIndexStore("foo_store", FooStore::new);
|
||||
try {
|
||||
|
@ -195,7 +194,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
|
||||
IndexModule module = new IndexModule(indexSettings, null,
|
||||
IndexModule module = new IndexModule(indexSettings,
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.addIndexEventListener(eventListener);
|
||||
IndexService indexService = newIndexService(module);
|
||||
|
@ -210,7 +209,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
|
||||
public void testListener() throws IOException {
|
||||
Setting<Boolean> booleanSetting = Setting.boolSetting("index.foo.bar", false, Property.Dynamic, Property.IndexScope);
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings, booleanSetting), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings, booleanSetting),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
Setting<Boolean> booleanSetting2 = Setting.boolSetting("index.foo.bar.baz", false, Property.Dynamic, Property.IndexScope);
|
||||
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
|
||||
|
@ -230,7 +229,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testAddIndexOperationListener() throws IOException {
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
AtomicBoolean executed = new AtomicBoolean(false);
|
||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||
|
@ -259,7 +258,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testAddSearchOperationListener() throws IOException {
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
AtomicBoolean executed = new AtomicBoolean(false);
|
||||
SearchOperationListener listener = new SearchOperationListener() {
|
||||
|
@ -293,7 +292,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put("index.similarity.my_similarity.key", "there is a key")
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.addSimilarity("test_similarity", (string, settings) -> new SimilarityProvider() {
|
||||
@Override
|
||||
|
@ -317,7 +316,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFrozen() {
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.freeze();
|
||||
String msg = "Can't modify IndexModule once the index service has been created";
|
||||
|
@ -336,7 +335,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module));
|
||||
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
|
||||
|
@ -348,7 +347,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module));
|
||||
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
|
||||
|
@ -358,7 +357,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
Settings indexSettings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
|
||||
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
|
||||
|
@ -371,7 +370,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
Settings indexSettings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
IndexService indexService = newIndexService(module);
|
||||
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
|
||||
|
@ -383,7 +382,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null,
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings),
|
||||
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap()));
|
||||
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
|
||||
IndexService indexService = newIndexService(module);
|
||||
|
@ -442,8 +441,8 @@ public class IndexModuleTests extends ESTestCase {
|
|||
|
||||
public static final class FooStore extends IndexStore {
|
||||
|
||||
public FooStore(IndexSettings indexSettings, IndexStoreConfig config) {
|
||||
super(indexSettings, config);
|
||||
public FooStore(IndexSettings indexSettings) {
|
||||
super(indexSettings);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -281,11 +281,6 @@ public class InternalEngineTests extends ESTestCase {
|
|||
public Directory newDirectory() throws IOException {
|
||||
return directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
|
||||
}
|
||||
|
|
|
@ -193,11 +193,6 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
public Directory newDirectory() throws IOException {
|
||||
return directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
|
||||
}
|
||||
|
|
|
@ -104,11 +104,6 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
public Directory newDirectory() throws IOException {
|
||||
return directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.store;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FileSwitchDirectory;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.RateLimitedFSDirectory;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.store.SleepingLockWrapper;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -45,34 +44,26 @@ public class FsDirectoryServiceTests extends ESTestCase {
|
|||
Settings.builder().put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true).build() :
|
||||
Settings.builder().put(IndexMetaData.SETTING_SHADOW_REPLICAS, true).build();;
|
||||
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
|
||||
IndexStoreConfig config = new IndexStoreConfig(build);
|
||||
IndexStore store = new IndexStore(settings, config);
|
||||
IndexStore store = new IndexStore(settings);
|
||||
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
|
||||
Files.createDirectories(tempDir);
|
||||
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
|
||||
FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path);
|
||||
Directory directory = fsDirectoryService.newDirectory();
|
||||
assertTrue(directory instanceof RateLimitedFSDirectory);
|
||||
RateLimitedFSDirectory rateLimitingDirectory = (RateLimitedFSDirectory) directory;
|
||||
Directory delegate = rateLimitingDirectory.getDelegate();
|
||||
assertTrue(delegate.getClass().toString(), delegate instanceof SleepingLockWrapper);
|
||||
assertTrue(directory.getClass().toString(), directory instanceof SleepingLockWrapper);
|
||||
}
|
||||
|
||||
public void testHasNoSleepWrapperOnNormalFS() throws IOException {
|
||||
Settings build = Settings.builder().put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "simplefs").build();
|
||||
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
|
||||
IndexStoreConfig config = new IndexStoreConfig(build);
|
||||
IndexStore store = new IndexStore(settings, config);
|
||||
IndexStore store = new IndexStore(settings);
|
||||
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
|
||||
Files.createDirectories(tempDir);
|
||||
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
|
||||
FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path);
|
||||
Directory directory = fsDirectoryService.newDirectory();
|
||||
assertTrue(directory instanceof RateLimitedFSDirectory);
|
||||
RateLimitedFSDirectory rateLimitingDirectory = (RateLimitedFSDirectory) directory;
|
||||
Directory delegate = rateLimitingDirectory.getDelegate();
|
||||
assertFalse(delegate instanceof SleepingLockWrapper);
|
||||
assertTrue(delegate instanceof SimpleFSDirectory);
|
||||
assertFalse(directory instanceof SleepingLockWrapper);
|
||||
assertTrue(directory instanceof SimpleFSDirectory);
|
||||
}
|
||||
|
||||
public void testPreload() throws IOException {
|
||||
|
@ -87,26 +78,22 @@ public class FsDirectoryServiceTests extends ESTestCase {
|
|||
.putArray(IndexModule.INDEX_STORE_PRE_LOAD_SETTING.getKey(), preload)
|
||||
.build();
|
||||
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
|
||||
IndexStoreConfig config = new IndexStoreConfig(settings.getSettings());
|
||||
IndexStore store = new IndexStore(settings, config);
|
||||
IndexStore store = new IndexStore(settings);
|
||||
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
|
||||
Files.createDirectories(tempDir);
|
||||
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
|
||||
FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path);
|
||||
Directory directory = fsDirectoryService.newDirectory();
|
||||
assertTrue(directory instanceof RateLimitedFSDirectory);
|
||||
RateLimitedFSDirectory rateLimitingDirectory = (RateLimitedFSDirectory) directory;
|
||||
Directory delegate = rateLimitingDirectory.getDelegate();
|
||||
assertFalse(delegate instanceof SleepingLockWrapper);
|
||||
assertFalse(directory instanceof SleepingLockWrapper);
|
||||
if (preload.length == 0) {
|
||||
assertTrue(delegate.toString(), delegate instanceof MMapDirectory);
|
||||
assertFalse(((MMapDirectory) delegate).getPreload());
|
||||
assertTrue(directory.toString(), directory instanceof MMapDirectory);
|
||||
assertFalse(((MMapDirectory) directory).getPreload());
|
||||
} else if (Arrays.asList(preload).contains("*")) {
|
||||
assertTrue(delegate.toString(), delegate instanceof MMapDirectory);
|
||||
assertTrue(((MMapDirectory) delegate).getPreload());
|
||||
assertTrue(directory.toString(), directory instanceof MMapDirectory);
|
||||
assertTrue(((MMapDirectory) directory).getPreload());
|
||||
} else {
|
||||
assertTrue(delegate.toString(), delegate instanceof FileSwitchDirectory);
|
||||
FileSwitchDirectory fsd = (FileSwitchDirectory) delegate;
|
||||
assertTrue(directory.toString(), directory instanceof FileSwitchDirectory);
|
||||
FileSwitchDirectory fsd = (FileSwitchDirectory) directory;
|
||||
assertTrue(fsd.getPrimaryDir() instanceof MMapDirectory);
|
||||
assertTrue(((MMapDirectory) fsd.getPrimaryDir()).getPreload());
|
||||
assertTrue(fsd.getSecondaryDir() instanceof MMapDirectory);
|
||||
|
|
|
@ -19,12 +19,10 @@
|
|||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FileSwitchDirectory;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.NIOFSDirectory;
|
||||
import org.apache.lucene.store.NoLockFactory;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -90,23 +88,4 @@ public class IndexStoreTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testUpdateThrottleType() throws IOException {
|
||||
Settings settings = Settings.builder().put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING.getKey(), "all")
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
|
||||
IndexStoreConfig indexStoreConfig = new IndexStoreConfig(settings);
|
||||
IndexStore store = new IndexStore(indexSettings, indexStoreConfig);
|
||||
assertEquals(StoreRateLimiting.Type.NONE, store.rateLimiting().getType());
|
||||
assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType());
|
||||
assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
|
||||
store.setType(IndexStore.IndexRateLimitingType.fromString("NODE"));
|
||||
assertEquals(StoreRateLimiting.Type.ALL, store.rateLimiting().getType());
|
||||
assertSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
|
||||
store.setType(IndexStore.IndexRateLimitingType.fromString("merge"));
|
||||
assertEquals(StoreRateLimiting.Type.MERGE, store.rateLimiting().getType());
|
||||
assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -483,10 +483,6 @@ public class StoreTests extends ESTestCase {
|
|||
return dir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return random.nextInt(1000);
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException {
|
||||
|
@ -952,10 +948,6 @@ public class StoreTests extends ESTestCase {
|
|||
assertTrue(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
|
||||
|
||||
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
|
@ -972,10 +964,6 @@ public class StoreTests extends ESTestCase {
|
|||
final ShardId shardId = new ShardId("index", "_na_", 1);
|
||||
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
|
||||
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
|
@ -1012,10 +1000,6 @@ public class StoreTests extends ESTestCase {
|
|||
final ShardId shardId = new ShardId("index", "_na_", 1);
|
||||
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
|
||||
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
|
|
|
@ -350,10 +350,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
private Store newStore(Path path, boolean checkIndex) throws IOException {
|
||||
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
|
|
|
@ -26,8 +26,6 @@ import org.apache.logging.log4j.core.LogEvent;
|
|||
import org.apache.logging.log4j.core.appender.AbstractAppender;
|
||||
import org.apache.logging.log4j.core.filter.RegexFilter;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
|
||||
|
@ -42,8 +40,6 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -244,138 +240,6 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
|
||||
}
|
||||
|
||||
// #6626: make sure we can update throttle settings and the changes take effect
|
||||
public void testUpdateThrottleSettings() {
|
||||
// No throttling at first, only 1 non-replicated shard, force lots of merging:
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
|
||||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
|
||||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "2")
|
||||
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0) // get stats all the time - no caching
|
||||
));
|
||||
ensureGreen();
|
||||
long termUpto = 0;
|
||||
for(int i=0;i<100;i++) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
|
||||
// No merge IO throttling should have happened:
|
||||
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
assertThat(stats.getIndices().getStore().getThrottleTime().getMillis(), equalTo(0L));
|
||||
}
|
||||
|
||||
logger.info("test: set low merge throttling");
|
||||
|
||||
// Now updates settings to turn on merge throttling lowish rate
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(
|
||||
Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "merge")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), "1mb"))
|
||||
.get();
|
||||
|
||||
// Make sure setting says it is in fact changed:
|
||||
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
assertThat(getSettingsResponse.getSetting("test", IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey()), equalTo("merge"));
|
||||
|
||||
// Also make sure we see throttling kicking in:
|
||||
boolean done = false;
|
||||
while (done == false) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
for(int i=0;i<5;i++) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
sb.append(" some random text that keeps repeating over and over again hambone");
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
}
|
||||
refresh();
|
||||
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
long throttleMillis = stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||
if (throttleMillis > 0) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("test: disable merge throttling");
|
||||
|
||||
// Now updates settings to disable merge throttling
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none"))
|
||||
.get();
|
||||
|
||||
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
|
||||
logger.info("test: optimize");
|
||||
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).get();
|
||||
logger.info("test: optimize done");
|
||||
|
||||
// Record current throttling so far
|
||||
long sumThrottleTime = 0;
|
||||
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
sumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||
}
|
||||
|
||||
// Make sure no further throttling happens:
|
||||
for(int i=0;i<100;i++) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
logger.info("test: done indexing after disabling throttling");
|
||||
|
||||
long newSumThrottleTime = 0;
|
||||
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
newSumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||
}
|
||||
|
||||
// No additional merge IO throttling should have happened:
|
||||
assertEquals(sumThrottleTime, newSumThrottleTime);
|
||||
|
||||
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
|
||||
// when ESIntegTestCase.after tries to remove indices created by the test:
|
||||
|
||||
// Wait for merges to finish
|
||||
client().admin().indices().prepareForceMerge("test").get();
|
||||
flush();
|
||||
|
||||
logger.info("test: test done");
|
||||
}
|
||||
|
||||
private static class MockAppender extends AbstractAppender {
|
||||
public boolean sawUpdateMaxThreadCount;
|
||||
public boolean sawUpdateAutoThrottle;
|
||||
|
|
|
@ -290,7 +290,6 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
public void testNonThrottleStats() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(settingsBuilder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "merge")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
|
||||
|
@ -322,7 +321,6 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
public void testThrottleStats() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(settingsBuilder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "merge")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
|
||||
|
|
|
@ -46,7 +46,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
@ -683,108 +682,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621")
|
||||
public void testChaosSnapshot() throws Exception {
|
||||
final List<String> indices = new CopyOnWriteArrayList<>();
|
||||
int initialNodes = between(1, 3);
|
||||
logger.info("--> start {} nodes", initialNodes);
|
||||
for (int i = 0; i < initialNodes; i++) {
|
||||
internalCluster().startNode();
|
||||
}
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(Settings.builder()
|
||||
.put("location", randomRepoPath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
|
||||
int initialIndices = between(1, 3);
|
||||
logger.info("--> create {} indices", initialIndices);
|
||||
for (int i = 0; i < initialIndices; i++) {
|
||||
createTestIndex("test-" + i);
|
||||
indices.add("test-" + i);
|
||||
}
|
||||
|
||||
int asyncNodes = between(0, 5);
|
||||
logger.info("--> start {} additional nodes asynchronously", asyncNodes);
|
||||
InternalTestCluster.Async<List<String>> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes);
|
||||
|
||||
int asyncIndices = between(0, 10);
|
||||
logger.info("--> create {} additional indices asynchronously", asyncIndices);
|
||||
Thread[] asyncIndexThreads = new Thread[asyncIndices];
|
||||
for (int i = 0; i < asyncIndices; i++) {
|
||||
final int cur = i;
|
||||
asyncIndexThreads[i] = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
createTestIndex("test-async-" + cur);
|
||||
indices.add("test-async-" + cur);
|
||||
|
||||
}
|
||||
});
|
||||
asyncIndexThreads[i].start();
|
||||
}
|
||||
|
||||
logger.info("--> snapshot");
|
||||
|
||||
ListenableActionFuture<CreateSnapshotResponse> snapshotResponseFuture = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-*").setPartial(true).execute();
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
// Produce chaos for 30 sec or until snapshot is done whatever comes first
|
||||
int randomIndices = 0;
|
||||
while (System.currentTimeMillis() - start < 30000 && !snapshotIsDone("test-repo", "test-snap")) {
|
||||
Thread.sleep(100);
|
||||
int chaosType = randomInt(10);
|
||||
if (chaosType < 4) {
|
||||
// Randomly delete an index
|
||||
if (indices.size() > 0) {
|
||||
String index = indices.remove(randomInt(indices.size() - 1));
|
||||
logger.info("--> deleting random index [{}]", index);
|
||||
internalCluster().wipeIndices(index);
|
||||
}
|
||||
} else if (chaosType < 6) {
|
||||
// Randomly shutdown a node
|
||||
if (cluster().size() > 1) {
|
||||
logger.info("--> shutting down random node");
|
||||
internalCluster().stopRandomDataNode();
|
||||
}
|
||||
} else if (chaosType < 8) {
|
||||
// Randomly create an index
|
||||
String index = "test-rand-" + randomIndices;
|
||||
logger.info("--> creating random index [{}]", index);
|
||||
createTestIndex(index);
|
||||
randomIndices++;
|
||||
} else {
|
||||
// Take a break
|
||||
logger.info("--> noop");
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("--> waiting for async indices creation to finish");
|
||||
for (int i = 0; i < asyncIndices; i++) {
|
||||
asyncIndexThreads[i].join();
|
||||
}
|
||||
|
||||
logger.info("--> update index settings to back to normal");
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test-*").setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "node")
|
||||
));
|
||||
|
||||
// Make sure that snapshot finished - doesn't matter if it failed or succeeded
|
||||
try {
|
||||
CreateSnapshotResponse snapshotResponse = snapshotResponseFuture.get();
|
||||
SnapshotInfo snapshotInfo = snapshotResponse.getSnapshotInfo();
|
||||
assertNotNull(snapshotInfo);
|
||||
logger.info("--> snapshot is done with state [{}], total shards [{}], successful shards [{}]", snapshotInfo.state(), snapshotInfo.totalShards(), snapshotInfo.successfulShards());
|
||||
} catch (Exception ex) {
|
||||
logger.info("--> snapshot didn't start properly", ex);
|
||||
}
|
||||
|
||||
asyncNodesFuture.get();
|
||||
logger.info("--> done");
|
||||
}
|
||||
|
||||
public void testMasterShutdownDuringSnapshot() throws Exception {
|
||||
Settings masterSettings = Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build();
|
||||
Settings dataSettings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build();
|
||||
|
@ -887,10 +784,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
index(name, "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings(name).setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "all")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), between(100, 50000))
|
||||
));
|
||||
}
|
||||
|
||||
static {
|
||||
|
|
|
@ -67,9 +67,9 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.ingest.IngestTestPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
|
@ -1674,12 +1674,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
refresh();
|
||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo(100L));
|
||||
|
||||
// Update settings to make sure that relocation is slow so we can start snapshot before relocation is finished
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "all")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), 100, ByteSizeUnit.BYTES)
|
||||
));
|
||||
|
||||
logger.info("--> start relocations");
|
||||
allowNodes("test-idx", internalCluster().numDataNodes());
|
||||
|
||||
|
@ -1690,11 +1684,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
logger.info("--> snapshot");
|
||||
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
|
||||
// Update settings to back to normal
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "node")
|
||||
));
|
||||
|
||||
logger.info("--> wait for snapshot to complete");
|
||||
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
|
||||
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
|
||||
|
|
|
@ -23,12 +23,11 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
|
||||
public class SmbMmapFsIndexStore extends IndexStore {
|
||||
|
||||
public SmbMmapFsIndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
|
||||
super(indexSettings, indexStoreConfig);
|
||||
public SmbMmapFsIndexStore(IndexSettings indexSettings) {
|
||||
super(indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,12 +23,11 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
|
||||
public class SmbSimpleFsIndexStore extends IndexStore {
|
||||
|
||||
public SmbSimpleFsIndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
|
||||
super(indexSettings, indexStoreConfig);
|
||||
public SmbSimpleFsIndexStore(IndexSettings indexSettings) {
|
||||
super(indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -126,11 +126,6 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
public Directory newDirectory() throws IOException {
|
||||
return newFSDirectory(shardPath.resolveIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
|||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
|
@ -78,7 +77,6 @@ import org.elasticsearch.index.engine.CommitStats;
|
|||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
|
@ -420,17 +418,6 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop");
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
if (random.nextInt(10) == 0) { // do something crazy slow here
|
||||
builder.put(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(RandomNumbers.randomIntBetween(random, 1, 10), ByteSizeUnit.MB));
|
||||
} else {
|
||||
builder.put(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(RandomNumbers.randomIntBetween(random, 10, 200), ByteSizeUnit.MB));
|
||||
}
|
||||
}
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING.getKey(), RandomPicks.randomFrom(random, StoreRateLimiting.Type.values()));
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
if (random.nextInt(10) == 0) { // do something crazy slow here
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(RandomNumbers.randomIntBetween(random, 1, 10), ByteSizeUnit.MB));
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.lucene.store.Directory;
|
|||
import org.apache.lucene.store.LockFactory;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestRuleMarkFailure;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -150,21 +149,6 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPause(long nanos) {
|
||||
delegateService.onPause(nanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return delegateService.rateLimiting();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return delegateService.throttleTimeInNanos();
|
||||
}
|
||||
|
||||
private Directory wrap(Directory dir) {
|
||||
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, this.crashIndex);
|
||||
w.setRandomIOExceptionRate(randomIOExceptionRate);
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -77,9 +76,8 @@ public class MockFSIndexStore extends IndexStore {
|
|||
}
|
||||
}
|
||||
|
||||
MockFSIndexStore(IndexSettings indexSettings,
|
||||
IndexStoreConfig config) {
|
||||
super(indexSettings, config);
|
||||
MockFSIndexStore(IndexSettings indexSettings) {
|
||||
super(indexSettings);
|
||||
}
|
||||
|
||||
public DirectoryService newDirectoryService(ShardPath path) {
|
||||
|
|
Loading…
Reference in New Issue