Remove IndexStore and DirectoryService (#42446)

Both of these classes are basically a bloated wrapper around a simple
construct that can simply be a DirectoryFactory interface. This change
removes both classes and replaces them with a simple stateless interface
that creates a new `Directory` per shard. The concept of `index.store` is preserved
since it makes sense from a configuration perspective.
This commit is contained in:
Simon Willnauer 2019-05-24 10:44:59 +02:00
parent 7cee294acf
commit 46ccfba808
23 changed files with 213 additions and 382 deletions

View File

@ -0,0 +1,28 @@
[[breaking-changes-7.3]]
== Breaking changes in 7.3
++++
<titleabbrev>7.3</titleabbrev>
++++
This section discusses the changes that you need to be aware of when migrating
your application to Elasticsearch 7.3.
See also <<release-highlights>> and <<es-release-notes>>.
coming[7.3.0]
//NOTE: The notable-breaking-changes tagged regions are re-used in the
//Installation and Upgrade Guide
//tag::notable-breaking-changes[]
// end::notable-breaking-changes[]
[[breaking_73_plugin_changes]]
=== Plugins changes
[float]
==== IndexStorePlugin changes
IndexStore and DirectoryService have been replaced by a stateless and simple
DirectoryFactory interface to create custom Lucene directory instances per shard.

View File

@ -23,22 +23,16 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.SmbDirectoryWrapper;
import java.io.IOException;
import java.nio.file.Path;
public class SmbMmapFsDirectoryService extends FsDirectoryService {
public SmbMmapFsDirectoryService(IndexSettings indexSettings, ShardPath path) {
super(indexSettings, path);
}
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {
@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
logger.debug("wrapping MMapDirectory for SMB");
return new SmbDirectoryWrapper(new MMapDirectory(location, indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING)));
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(new MMapDirectory(location, lockFactory));
}
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.smbmmapfs;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
public class SmbMmapFsIndexStore extends IndexStore {
public SmbMmapFsIndexStore(IndexSettings indexSettings) {
super(indexSettings);
}
@Override
public DirectoryService newDirectoryService(ShardPath path) {
return new SmbMmapFsDirectoryService(indexSettings, path);
}
}

View File

@ -23,22 +23,16 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.SmbDirectoryWrapper;
import java.io.IOException;
import java.nio.file.Path;
public class SmbSimpleFsDirectoryService extends FsDirectoryService {
public SmbSimpleFsDirectoryService(IndexSettings indexSettings, ShardPath path) {
super(indexSettings, path);
}
public final class SmbSimpleFsDirectoryFactory extends FsDirectoryFactory {
@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
logger.debug("wrapping SimpleFSDirectory for SMB");
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(new SimpleFSDirectory(location, lockFactory));
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.smbsimplefs;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
public class SmbSimpleFsIndexStore extends IndexStore {
public SmbSimpleFsIndexStore(IndexSettings indexSettings) {
super(indexSettings);
}
@Override
public DirectoryService newDirectoryService(ShardPath path) {
return new SmbSimpleFsDirectoryService(indexSettings, path);
}
}

View File

@ -19,25 +19,22 @@
package org.elasticsearch.plugin.store.smb;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.smbmmapfs.SmbMmapFsIndexStore;
import org.elasticsearch.index.store.smbsimplefs.SmbSimpleFsIndexStore;
import org.elasticsearch.index.store.smbmmapfs.SmbMmapFsDirectoryFactory;
import org.elasticsearch.index.store.smbsimplefs.SmbSimpleFsDirectoryFactory;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
public class SMBStorePlugin extends Plugin implements IndexStorePlugin {
@Override
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories = new HashMap<>(2);
indexStoreFactories.put("smb_mmap_fs", SmbMmapFsIndexStore::new);
indexStoreFactories.put("smb_simple_fs", SmbSimpleFsIndexStore::new);
public Map<String, DirectoryFactory> getDirectoryFactories() {
final Map<String, DirectoryFactory> indexStoreFactories = new HashMap<>(2);
indexStoreFactories.put("smb_mmap_fs", new SmbMmapFsDirectoryFactory());
indexStoreFactories.put("smb_simple_fs", new SmbSimpleFsDirectoryFactory());
return Collections.unmodifiableMap(indexStoreFactories);
}

View File

@ -38,7 +38,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesRequestCache;
@ -159,7 +159,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
Store.FORCE_RAM_TERM_DICT,
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,

View File

@ -52,7 +52,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.monitor.fs.FsProbe;
import org.elasticsearch.monitor.jvm.JvmInfo;
@ -469,7 +469,7 @@ public final class NodeEnvironment implements Closeable {
// resolve the directory the shard actually lives in
Path p = shardPaths[i].resolve("index");
// open a directory (will be immediately closed) on the shard's location
dirs[i] = new SimpleFSDirectory(p, indexSettings.getValue(FsDirectoryService.INDEX_LOCK_FACTOR_SETTING));
dirs[i] = new SimpleFSDirectory(p, indexSettings.getValue(FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING));
// create a lock for the "write.lock" file
try {
locks[i] = dirs[i].obtainLock(IndexWriter.WRITE_LOCK_NAME);

View File

@ -45,7 +45,8 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -75,7 +76,7 @@ import java.util.function.Function;
* {@link #addSimilarity(String, TriFunction)} while existing Providers can be referenced through Settings under the
* {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix along with the "type" value. For example, to reference the
* {@link BM25Similarity}, the configuration {@code "index.similarity.my_similarity.type : "BM25"} can be used.</li>
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link IndexStorePlugin}</li>
* <li>{@link DirectoryService} - Custom {@link DirectoryService} instances can be registered via {@link IndexStorePlugin}</li>
* <li>{@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via
* {@link #addIndexEventListener(IndexEventListener)}</li>
* <li>Settings update listener - Custom settings update listener can be registered via
@ -86,6 +87,8 @@ public final class IndexModule {
public static final Setting<Boolean> NODE_STORE_ALLOW_MMAP = Setting.boolSetting("node.store.allow_mmap", true, Property.NodeScope);
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
public static final Setting<String> INDEX_STORE_TYPE_SETTING =
new Setting<>("index.store.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);
@ -112,7 +115,7 @@ public final class IndexModule {
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
@ -125,19 +128,19 @@ public final class IndexModule {
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexStoreFactories the available store types
* @param directoryFactories the available store types
*/
public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.indexStoreFactories = Collections.unmodifiableMap(indexStoreFactories);
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
}
/**
@ -384,7 +387,7 @@ public final class IndexModule {
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null
? (shard) -> null : indexSearcherWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStore store = getIndexStore(indexSettings, indexStoreFactories);
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final QueryCache queryCache;
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
@ -399,12 +402,12 @@ public final class IndexModule {
return new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
client, queryCache, directoryFactory, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}
private static IndexStore getIndexStore(
final IndexSettings indexSettings, final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
final IndexSettings indexSettings, final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories) {
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
final Type type;
final Boolean allowMmap = NODE_STORE_ALLOW_MMAP.get(indexSettings.getNodeSettings());
@ -420,20 +423,16 @@ public final class IndexModule {
if (allowMmap == false && (type == Type.MMAPFS || type == Type.HYBRIDFS)) {
throw new IllegalArgumentException("store type [" + storeType + "] is not allowed because mmap is disabled");
}
final IndexStore store;
final IndexStorePlugin.DirectoryFactory factory;
if (storeType.isEmpty() || isBuiltinType(storeType)) {
store = new IndexStore(indexSettings);
factory = DEFAULT_DIRECTORY_FACTORY;
} else {
Function<IndexSettings, IndexStore> factory = indexStoreFactories.get(storeType);
factory = indexStoreFactories.get(storeType);
if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
}
store = factory.apply(indexSettings);
if (store == null) {
throw new IllegalStateException("store must not be null");
}
}
return store;
return factory;
}
/**

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
@ -65,14 +66,13 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -103,7 +103,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final BitsetFilterCache bitsetFilterCache;
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStore indexStore;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexSearcherWrapper searcherWrapper;
private final IndexCache indexCache;
private final MapperService mapperService;
@ -149,7 +149,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
ScriptService scriptService,
Client client,
QueryCache queryCache,
IndexStore indexStore,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexEventListener eventListener,
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
MapperRegistry mapperRegistry,
@ -200,7 +200,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.client = client;
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.indexStore = indexStore;
this.directoryFactory = directoryFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
@ -401,9 +401,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
warmer.warm(searcher, shard, IndexService.this.indexSettings);
}
};
// TODO we can remove either IndexStore or DirectoryService. All we need is a simple Supplier<Directory>
DirectoryService directoryService = indexStore.newDirectoryService(path);
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(shardId, this.indexSettings, directory, lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
@ -753,8 +752,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
return searcherWrapper;
} // pkg private for testing
final IndexStore getIndexStore() {
return indexStore;
final IndexStorePlugin.DirectoryFactory getDirectoryFactory() {
return directoryFactory;
} // pkg private for testing
private void maybeFSyncTranslogs() {

View File

@ -30,13 +30,13 @@ import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.plugins.IndexStorePlugin;
import java.io.IOException;
import java.nio.file.Files;
@ -44,7 +44,8 @@ import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
public class FsDirectoryService extends DirectoryService {
public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
public static final Setting<LockFactory> INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> {
switch (s) {
case "native":
@ -56,27 +57,20 @@ public class FsDirectoryService extends DirectoryService {
} // can we set on both - node and index level, some nodes might be running on NFS so they might need simple rather than native
}, Property.IndexScope, Property.NodeScope);
private final ShardPath path;
@Inject
public FsDirectoryService(IndexSettings indexSettings, ShardPath path) {
super(path.getShardId(), indexSettings);
this.path = path;
}
@Override
public Directory newDirectory() throws IOException {
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
final Path location = path.resolveIndex();
final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING);
Files.createDirectories(location);
Directory wrapped = newFSDirectory(location, lockFactory);
Directory wrapped = newFSDirectory(location, lockFactory, indexSettings);
Set<String> preLoadExtensions = new HashSet<>(
indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
wrapped = setPreload(wrapped, location, lockFactory, preLoadExtensions);
return wrapped;
}
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
final String storeType =
indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey());
IndexModule.Type type;

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
public class IndexStore extends AbstractIndexComponent {
public IndexStore(IndexSettings indexSettings) {
super(indexSettings);
}
/**
* The shard store class that should be used for each shard.
*/
public DirectoryService newDirectoryService(ShardPath path) {
return new FsDirectoryService(indexSettings, path);
}
}

View File

@ -110,13 +110,13 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
@ -201,7 +201,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndicesQueryCache indicesQueryCache;
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
@ -217,7 +217,7 @@ public class IndicesService extends AbstractLifecycleComponent
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories) {
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
@ -252,13 +252,13 @@ public class IndicesService extends AbstractLifecycleComponent
this.engineFactoryProviders = engineFactoryProviders;
// do not allow any plugin-provided index store type to conflict with a built-in type
for (final String indexStoreType : indexStoreFactories.keySet()) {
for (final String indexStoreType : directoryFactories.keySet()) {
if (IndexModule.isBuiltinType(indexStoreType)) {
throw new IllegalStateException("registered index store type [" + indexStoreType + "] conflicts with a built-in type");
}
}
this.indexStoreFactories = indexStoreFactories;
this.directoryFactories = directoryFactories;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
@ -553,7 +553,7 @@ public class IndicesService extends AbstractLifecycleComponent
idxSettings.getNumberOfReplicas(),
indexCreationContext);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), directoryFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
@ -620,7 +620,7 @@ public class IndicesService extends AbstractLifecycleComponent
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), directoryFactories);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}

View File

@ -100,7 +100,6 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
@ -410,10 +409,10 @@ public class Node implements Closeable {
.collect(Collectors.toList());
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories =
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexStoreFactories)
.map(IndexStorePlugin::getDirectoryFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

View File

@ -19,24 +19,40 @@
package org.elasticsearch.plugins;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.shard.ShardPath;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
/**
* A plugin that provides alternative index store implementations.
* A plugin that provides alternative directory implementations.
*/
public interface IndexStorePlugin {
/**
* The index store factories for this plugin. When an index is created the store type setting
* {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
* built-in type, or looked up among all the index store factories from {@link IndexStore} plugins.
*
* @return a map from store type to an index store factory
* An interface that describes how to create a new directory instance per shard.
*/
Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories();
@FunctionalInterface
interface DirectoryFactory {
/**
* Creates a new directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @return a new lucene directory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}
/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
* built-in type, or looked up among all the directory factories from {@link IndexStorePlugin} plugins.
*
* @return a map from store type to an directory factory
*/
Map<String, DirectoryFactory> getDirectoryFactories();
}

View File

@ -29,6 +29,7 @@ import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -60,9 +61,10 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.NonNegativeScoresSimilarity;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -70,6 +72,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ClusterServiceUtils;
@ -86,7 +89,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
@ -174,11 +176,12 @@ public class IndexModuleTests extends ESTestCase {
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
.build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories = Collections.singletonMap("foo_store", FooStore::new);
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = Collections.singletonMap(
"foo_store", new FooFunction());
final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories);
final IndexService indexService = newIndexService(module);
assertThat(indexService.getIndexStore(), instanceOf(FooStore.class));
assertThat(indexService.getDirectoryFactory(), instanceOf(FooFunction.class));
indexService.close("simon says", false);
}
@ -444,10 +447,11 @@ public class IndexModuleTests extends ESTestCase {
}
}
public static final class FooStore extends IndexStore {
public static final class FooFunction implements IndexStorePlugin.DirectoryFactory {
public FooStore(IndexSettings indexSettings) {
super(indexSettings);
@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return new FsDirectoryFactory().newDirectory(indexSettings, shardPath);
}
}

View File

@ -125,7 +125,7 @@ import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSDirectoryFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Assert;
@ -3827,7 +3827,7 @@ public class IndexShardTests extends IndexShardTestCase {
readyToCloseLatch.await();
shard.close("testing", false);
// in integration tests, this is done as a listener on IndexService.
MockFSDirectoryService.checkIndex(logger, shard.store(), shard.shardId);
MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId);
} catch (InterruptedException | IOException e) {
throw new AssertionError(e);
} finally {

View File

@ -19,10 +19,12 @@
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FileSwitchDirectory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.store.SleepingLockWrapper;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -36,32 +38,68 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Locale;
public class IndexStoreTests extends ESTestCase {
public class FsDirectoryFactoryTests extends ESTestCase {
public void testPreload() throws IOException {
doTestPreload();
doTestPreload("nvd", "dvd", "tim");
doTestPreload("*");
}
private void doTestPreload(String...preload) throws IOException {
Settings build = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "mmapfs")
.putList(IndexModule.INDEX_STORE_PRE_LOAD_SETTING.getKey(), preload)
.build();
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
FsDirectoryFactory fsDirectoryFactory = new FsDirectoryFactory();
Directory directory = fsDirectoryFactory.newDirectory(settings, path);
assertFalse(directory instanceof SleepingLockWrapper);
if (preload.length == 0) {
assertTrue(directory.toString(), directory instanceof MMapDirectory);
assertFalse(((MMapDirectory) directory).getPreload());
} else if (Arrays.asList(preload).contains("*")) {
assertTrue(directory.toString(), directory instanceof MMapDirectory);
assertTrue(((MMapDirectory) directory).getPreload());
} else {
assertTrue(directory.toString(), directory instanceof FileSwitchDirectory);
FileSwitchDirectory fsd = (FileSwitchDirectory) directory;
assertTrue(fsd.getPrimaryDir() instanceof MMapDirectory);
assertTrue(((MMapDirectory) fsd.getPrimaryDir()).getPreload());
assertTrue(fsd.getSecondaryDir() instanceof MMapDirectory);
assertFalse(((MMapDirectory) fsd.getSecondaryDir()).getPreload());
}
}
public void testStoreDirectory() throws IOException {
Index index = new Index("foo", "fooUUID");
final Path tempDir = createTempDir().resolve(index.getUUID()).resolve("0");
// default
doTestStoreDirectory(index, tempDir, null, IndexModule.Type.FS);
doTestStoreDirectory(tempDir, null, IndexModule.Type.FS);
// explicit directory impls
for (IndexModule.Type type : IndexModule.Type.values()) {
doTestStoreDirectory(index, tempDir, type.name().toLowerCase(Locale.ROOT), type);
doTestStoreDirectory(tempDir, type.name().toLowerCase(Locale.ROOT), type);
}
}
private void doTestStoreDirectory(Index index, Path tempDir, String typeSettingValue, IndexModule.Type type) throws IOException {
private void doTestStoreDirectory(Path tempDir, String typeSettingValue, IndexModule.Type type) throws IOException {
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT);
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT);
if (typeSettingValue != null) {
settingsBuilder.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), typeSettingValue);
}
Settings settings = settingsBuilder.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
FsDirectoryService service = new FsDirectoryService(indexSettings, new ShardPath(false, tempDir, tempDir, new ShardId(index, 0)));
try (Directory directory = service.newFSDirectory(tempDir, NoLockFactory.INSTANCE)) {
FsDirectoryFactory service = new FsDirectoryFactory();
try (Directory directory = service.newFSDirectory(tempDir, NoLockFactory.INSTANCE, indexSettings)) {
switch (type) {
case HYBRIDFS:
assertHybridDirectory(directory);
@ -91,8 +129,8 @@ public class IndexStoreTests extends ESTestCase {
}
private void assertHybridDirectory(Directory directory) {
assertTrue(directory.toString(), directory instanceof FsDirectoryService.HybridDirectory);
Directory randomAccessDirectory = ((FsDirectoryService.HybridDirectory) directory).getRandomAccessDirectory();
assertTrue(directory.toString(), directory instanceof FsDirectoryFactory.HybridDirectory);
Directory randomAccessDirectory = ((FsDirectoryFactory.HybridDirectory) directory).getRandomAccessDirectory();
assertTrue("randomAccessDirectory: " + randomAccessDirectory.toString(), randomAccessDirectory instanceof MMapDirectory);
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FileSwitchDirectory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.SleepingLockWrapper;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
public class FsDirectoryServiceTests extends ESTestCase {
public void testPreload() throws IOException {
doTestPreload();
doTestPreload("nvd", "dvd", "tim");
doTestPreload("*");
}
private void doTestPreload(String...preload) throws IOException {
Settings build = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "mmapfs")
.putList(IndexModule.INDEX_STORE_PRE_LOAD_SETTING.getKey(), preload)
.build();
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, path);
Directory directory = fsDirectoryService.newDirectory();
assertFalse(directory instanceof SleepingLockWrapper);
if (preload.length == 0) {
assertTrue(directory.toString(), directory instanceof MMapDirectory);
assertFalse(((MMapDirectory) directory).getPreload());
} else if (Arrays.asList(preload).contains("*")) {
assertTrue(directory.toString(), directory instanceof MMapDirectory);
assertTrue(((MMapDirectory) directory).getPreload());
} else {
assertTrue(directory.toString(), directory instanceof FileSwitchDirectory);
FileSwitchDirectory fsd = (FileSwitchDirectory) directory;
assertTrue(fsd.getPrimaryDir() instanceof MMapDirectory);
assertTrue(((MMapDirectory) fsd.getPrimaryDir()).getPreload());
assertTrue(fsd.getSecondaryDir() instanceof MMapDirectory);
assertFalse(((MMapDirectory) fsd.getSecondaryDir()).getPreload());
}
}
}

View File

@ -22,15 +22,13 @@ package org.elasticsearch.plugins;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
@ -41,8 +39,8 @@ public class IndexStorePluginTests extends ESTestCase {
public static class BarStorePlugin extends Plugin implements IndexStorePlugin {
@Override
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
return Collections.singletonMap("store", IndexStore::new);
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.singletonMap("store", new FsDirectoryFactory());
}
}
@ -50,8 +48,8 @@ public class IndexStorePluginTests extends ESTestCase {
public static class FooStorePlugin extends Plugin implements IndexStorePlugin {
@Override
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
return Collections.singletonMap("store", IndexStore::new);
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.singletonMap("store", new FsDirectoryFactory());
}
}
@ -65,8 +63,8 @@ public class IndexStorePluginTests extends ESTestCase {
}
@Override
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
return Collections.singletonMap(TYPE, IndexStore::new);
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.singletonMap(TYPE, new FsDirectoryFactory());
}
}
@ -86,11 +84,11 @@ public class IndexStorePluginTests extends ESTestCase {
if (JavaVersion.current().compareTo(JavaVersion.parse("9")) >= 0) {
assertThat(e, hasToString(matches(
"java.lang.IllegalStateException: Duplicate key store \\(attempted merging values " +
"org.elasticsearch.plugins.IndexStorePluginTests\\$BarStorePlugin.* " +
"and org.elasticsearch.plugins.IndexStorePluginTests\\$FooStorePlugin.*\\)")));
"org.elasticsearch.index.store.FsDirectoryFactory@[\\w\\d]+ " +
"and org.elasticsearch.index.store.FsDirectoryFactory@[\\w\\d]+\\)")));
} else {
assertThat(e, hasToString(matches(
"java.lang.IllegalStateException: Duplicate key org.elasticsearch.plugins.IndexStorePluginTests\\$BarStorePlugin.*")));
"java.lang.IllegalStateException: Duplicate key org.elasticsearch.index.store.FsDirectoryFactory@[\\w\\d]+")));
}
}

View File

@ -37,7 +37,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSDirectoryFactory;
import org.elasticsearch.test.store.MockFSIndexStore;
import java.io.IOException;
import java.util.Arrays;
@ -104,16 +104,16 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase {
client().admin().indices().prepareFlush("test").execute().get();
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.getKey(), exceptionOnOpenRate));
.put(MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)
.put(MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.getKey(), exceptionOnOpenRate));
client().admin().indices().prepareOpen("test").execute().get();
} else {
Settings.Builder settings = Settings.builder()
.put("index.number_of_replicas", randomIntBetween(0, 1))
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)
.put(MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)
// we cannot expect that the index will be valid
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.getKey(), exceptionOnOpenRate);
.put(MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.getKey(), exceptionOnOpenRate);
logger.info("creating index: [test] using settings: [{}]", settings.build());
client().admin().indices().prepareCreate("test")
.setSettings(settings)
@ -195,8 +195,8 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase {
// check the index still contains the records that we indexed without errors
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), 0)
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.getKey(), 0));
.put(MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), 0)
.put(MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.getKey(), 0));
client().admin().indices().prepareOpen("test").execute().get();
ensureGreen();
SearchResponse searchResponse = client().prepareSearch().setTypes("type")

View File

@ -19,19 +19,16 @@
package org.elasticsearch.test.store;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestRuleMarkFailure;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
@ -41,8 +38,9 @@ import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assert;
@ -51,12 +49,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
public class MockFSDirectoryService extends FsDirectoryService {
public class MockFSDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
public static final Setting<Double> RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING =
Setting.doubleSetting("index.store.mock.random.io_exception_rate_on_open", 0.0d, 0.0d, Property.IndexScope, Property.NodeScope);
@ -65,42 +62,12 @@ public class MockFSDirectoryService extends FsDirectoryService {
public static final Setting<Boolean> CRASH_INDEX_SETTING =
Setting.boolSetting("index.store.mock.random.crash_index", true, Property.IndexScope, Property.NodeScope);
private final FsDirectoryService delegateService;
private final Random random;
private final double randomIOExceptionRate;
private final double randomIOExceptionRateOnOpen;
private final MockDirectoryWrapper.Throttling throttle;
private final boolean crashIndex;
@Inject
public MockFSDirectoryService(IndexSettings idxSettings, final ShardPath path) {
super(idxSettings, path);
@Override
public Directory newDirectory(IndexSettings idxSettings, ShardPath path) throws IOException {
Settings indexSettings = idxSettings.getSettings();
final long seed = idxSettings.getValue(ESIntegTestCase.INDEX_TEST_SEED_SETTING);
this.random = new Random(seed);
randomIOExceptionRate = RANDOM_IO_EXCEPTION_RATE_SETTING.get(indexSettings);
randomIOExceptionRateOnOpen = RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.get(indexSettings);
random.nextInt(shardId.getId() + 1); // some randomness per shard
throttle = MockDirectoryWrapper.Throttling.NEVER;
crashIndex = CRASH_INDEX_SETTING.get(indexSettings);
if (logger.isDebugEnabled()) {
logger.debug("Using MockDirWrapper with seed [{}] throttle: [{}] crashIndex: [{}]", SeedUtils.formatSeed(seed),
throttle, crashIndex);
}
delegateService = randomDirectoryService(idxSettings, path);
}
@Override
public Directory newDirectory() throws IOException {
return wrap(delegateService.newDirectory());
}
@Override
protected synchronized Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
throw new UnsupportedOperationException();
Random random = new Random(idxSettings.getValue(ESIntegTestCase.INDEX_TEST_SEED_SETTING));
return wrap(randomDirectoryService(random, idxSettings, path), random, indexSettings,
path.getShardId());
}
public static void checkIndex(Logger logger, Store store, ShardId shardId) {
@ -138,8 +105,14 @@ public class MockFSDirectoryService extends FsDirectoryService {
}
}
private Directory wrap(Directory dir) {
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, this.crashIndex);
private Directory wrap(Directory dir, Random random, Settings indexSettings, ShardId shardId) {
double randomIOExceptionRate = RANDOM_IO_EXCEPTION_RATE_SETTING.get(indexSettings);
double randomIOExceptionRateOnOpen = RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING.get(indexSettings);
random.nextInt(shardId.getId() + 1); // some randomness per shard
MockDirectoryWrapper.Throttling throttle = MockDirectoryWrapper.Throttling.NEVER;
boolean crashIndex = CRASH_INDEX_SETTING.get(indexSettings);
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, crashIndex);
w.setRandomIOExceptionRate(randomIOExceptionRate);
w.setRandomIOExceptionRateOnOpen(randomIOExceptionRateOnOpen);
w.setThrottling(throttle);
@ -151,7 +124,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
return w;
}
private FsDirectoryService randomDirectoryService(IndexSettings indexSettings, ShardPath path) {
private Directory randomDirectoryService(Random random, IndexSettings indexSettings, ShardPath path) throws IOException {
final IndexMetaData build = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder()
// don't use the settings from indexSettings#getSettings() they are merged with node settings and might contain
@ -161,7 +134,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
RandomPicks.randomFrom(random, IndexModule.Type.values()).getSettingsKey()))
.build();
final IndexSettings newIndexSettings = new IndexSettings(build, indexSettings.getNodeSettings());
return new FsDirectoryService(newIndexSettings, path);
return new FsDirectoryFactory().newDirectory(newIndexSettings, path);
}
public static final class ElasticsearchMockDirectoryWrapper extends MockDirectoryWrapper {

View File

@ -26,14 +26,10 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.Plugin;
@ -43,9 +39,8 @@ import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class MockFSIndexStore extends IndexStore {
public final class MockFSIndexStore {
public static final Setting<Boolean> INDEX_CHECK_INDEX_ON_CLOSE_SETTING =
Setting.boolSetting("index.store.mock.check_index_on_close", true, Property.IndexScope, Property.NodeScope);
@ -59,14 +54,14 @@ public class MockFSIndexStore extends IndexStore {
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(INDEX_CHECK_INDEX_ON_CLOSE_SETTING,
MockFSDirectoryService.CRASH_INDEX_SETTING,
MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING,
MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING);
MockFSDirectoryFactory.CRASH_INDEX_SETTING,
MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_SETTING,
MockFSDirectoryFactory.RANDOM_IO_EXCEPTION_RATE_ON_OPEN_SETTING);
}
@Override
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
return Collections.singletonMap("mock", MockFSIndexStore::new);
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.singletonMap("mock", new MockFSDirectoryFactory());
}
@Override
@ -80,15 +75,6 @@ public class MockFSIndexStore extends IndexStore {
}
}
MockFSIndexStore(IndexSettings indexSettings) {
super(indexSettings);
}
@Override
public DirectoryService newDirectoryService(ShardPath path) {
return new MockFSDirectoryService(indexSettings, path);
}
private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
IndexShardState.STARTED, IndexShardState.POST_RECOVERY
);
@ -101,7 +87,7 @@ public class MockFSIndexStore extends IndexStore {
Boolean remove = shardSet.remove(indexShard);
if (remove == Boolean.TRUE) {
Logger logger = Loggers.getLogger(getClass(), indexShard.shardId());
MockFSDirectoryService.checkIndex(logger, indexShard.store(), indexShard.shardId());
MockFSDirectoryFactory.checkIndex(logger, indexShard.store(), indexShard.shardId());
}
}
}
@ -115,5 +101,4 @@ public class MockFSIndexStore extends IndexStore {
}
}
}