Replace IndicesLifecycle with a per-index IndexEventListener
Today IndicesLifecycle is a per-node class that allows to register listeners at any time. It also requires to de-register if listeners are not needed anymore ie. if classes are created per-index / shard etc. They also cause issues where listeners are registered more than once as in #13259 This commit removes the per-node class and replaces it with an well defined extension point that allows listeners to be registered at index creation time without the need to unregister since listeners are go out of scope if the index goes out of scope. Yet, this still allows to share instances across indices as before but without the risk of double registering them etc. All data-structures used for event notifications are now immuatble and can only changes on index creation time. This removes flexibility to some degree but increases maintainability of the interface and the code itself dramatically especially with the step by step removal of the index level dependency injection. Closes #13259
This commit is contained in:
parent
75cedca0da
commit
4fea56cead
|
@ -80,13 +80,7 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -313,7 +307,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
// Set up everything, now locally create the index to see that things are ok, and apply
|
||||
final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build();
|
||||
// create the index here (on the master) to validate it can be created, as well as adding the mapping
|
||||
indicesService.createIndex(tmpImd);
|
||||
indicesService.createIndex(tmpImd, Collections.EMPTY_LIST);
|
||||
indexCreated = true;
|
||||
// now add the mappings
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
|
@ -387,7 +381,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
throw e;
|
||||
}
|
||||
|
||||
indexService.indicesLifecycle().beforeIndexAddedToCluster(new Index(request.index()),
|
||||
indexService.getIndexEventListener().beforeIndexAddedToCluster(new Index(request.index()),
|
||||
indexMetaData.getSettings());
|
||||
|
||||
MetaData newMetaData = MetaData.builder(currentState.metaData())
|
||||
|
@ -433,29 +427,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private void addMappings(Map<String, Map<String, Object>> mappings, Path mappingsDir) throws IOException {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(mappingsDir)) {
|
||||
for (Path mappingFile : stream) {
|
||||
final String fileName = mappingFile.getFileName().toString();
|
||||
if (FileSystemUtils.isHidden(mappingFile)) {
|
||||
continue;
|
||||
}
|
||||
int lastDotIndex = fileName.lastIndexOf('.');
|
||||
String mappingType = lastDotIndex != -1 ? mappingFile.getFileName().toString().substring(0, lastDotIndex) : mappingFile.getFileName().toString();
|
||||
try (BufferedReader reader = Files.newBufferedReader(mappingFile, StandardCharsets.UTF_8)) {
|
||||
String mappingSource = Streams.copyToString(reader);
|
||||
if (mappings.containsKey(mappingType)) {
|
||||
XContentHelper.mergeDefaults(mappings.get(mappingType), parseMapping(mappingSource));
|
||||
} else {
|
||||
mappings.put(mappingType, parseMapping(mappingSource));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to read / parse mapping [" + mappingType + "] from location [" + mappingFile + "], ignoring...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state, IndexTemplateFilter indexTemplateFilter) throws IOException {
|
||||
List<IndexTemplateMetaData> templates = new ArrayList<>();
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) {
|
||||
|
|
|
@ -36,10 +36,7 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Service responsible for submitting add and remove aliases requests
|
||||
|
@ -98,7 +95,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
|
|||
if (indexService == null) {
|
||||
// temporarily create the index and add mappings so we can parse the filter
|
||||
try {
|
||||
indexService = indicesService.createIndex(indexMetaData);
|
||||
indexService = indicesService.createIndex(indexMetaData, Collections.EMPTY_LIST);
|
||||
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
|
||||
indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, false);
|
||||
}
|
||||
|
|
|
@ -172,7 +172,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
IndexService indexService = indicesService.indexService(index);
|
||||
if (indexService == null) {
|
||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||
indexService = indicesService.createIndex(indexMetaData);
|
||||
indexService = indicesService.createIndex(indexMetaData, Collections.EMPTY_LIST);
|
||||
removeIndex = true;
|
||||
Set<String> typesToIntroduce = new HashSet<>();
|
||||
for (MappingTask task : tasks) {
|
||||
|
@ -350,7 +350,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
final IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
IndexService indexService = indicesService.createIndex(indexMetaData);
|
||||
IndexService indexService = indicesService.createIndex(indexMetaData, Collections.EMPTY_LIST);
|
||||
indicesToClose.add(indexMetaData.getIndex());
|
||||
// make sure to add custom default mapping if exists
|
||||
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
|
||||
|
|
|
@ -17,231 +17,244 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices;
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* InternalIndicesLifecycle handles invoking each listener for the Index. All
|
||||
* exceptions thrown by listeners are logged and then re-thrown to stop further
|
||||
* index action.
|
||||
* A composite {@link IndexEventListener} that forwards all callbacks to an immutable list of IndexEventListener
|
||||
*/
|
||||
public class InternalIndicesLifecycle extends AbstractComponent implements IndicesLifecycle {
|
||||
final class CompositeIndexEventListener implements IndexEventListener {
|
||||
|
||||
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final List<IndexEventListener> listeners;
|
||||
private final ESLogger logger;
|
||||
|
||||
@Inject
|
||||
public InternalIndicesLifecycle(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
listeners.add(listener);
|
||||
CompositeIndexEventListener(String index, Settings indexSettings, Collection<IndexEventListener> listeners) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
if (listener == null) {
|
||||
throw new IllegalArgumentException("listeners must be non-null");
|
||||
}
|
||||
}
|
||||
this.listeners = Collections.unmodifiableList(new ArrayList<>(listeners));
|
||||
this.logger = Loggers.getLogger(getClass(), indexSettings, index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeListener(Listener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
|
||||
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
|
||||
for (Listener listener : listeners) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.shardRoutingChanged(indexShard, oldRouting, newRouting);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke shard touring changed callback", t, indexShard.shardId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexAddedToCluster(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke before index added to cluster callback", t, index.name());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexCreated(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke before index created callback", t, index.name());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void afterIndexCreated(IndexService indexService) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexCreated(indexService);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke after index created callback", t, indexService.index().name());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beforeIndexShardCreated(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexShardCreated(shardId, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke before shard created callback", t, shardId);
|
||||
throw t;
|
||||
logger.warn("[{}] failed to invoke shard touring changed callback", t, indexShard.shardId().getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardCreated(IndexShard indexShard) {
|
||||
for (Listener listener : listeners) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexShardCreated(indexShard);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke after shard created callback", t, indexShard.shardId());
|
||||
logger.warn("[{}] failed to invoke after shard created callback", t, indexShard.shardId().getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void afterIndexShardStarted(IndexShard indexShard) {
|
||||
for (Listener listener : listeners) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexShardStarted(indexShard);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke after shard started callback", t, indexShard.shardId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beforeIndexClosed(IndexService indexService) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexClosed(indexService);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke before index closed callback", t, indexService.index().name());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexDeleted(indexService);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke before index deleted callback", t, indexService.index().name());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexDeleted(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke after index deleted callback", t, index.name());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexClosed(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke after index closed callback", t, index.name());
|
||||
logger.warn("[{}] failed to invoke after shard started callback", t, indexShard.shardId().getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexShardClosed(shardId, indexShard, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke before shard closed callback", t, shardId);
|
||||
logger.warn("[{}] failed to invoke before shard closed callback", t, shardId.getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexShardClosed(shardId, indexShard, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke after shard closed callback", t, shardId);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beforeIndexShardDeleted(ShardId shardId,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexShardDeleted(shardId, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke before shard deleted callback", t, shardId);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void afterIndexShardDeleted(ShardId shardId,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexShardDeleted(shardId, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke after shard deleted callback", t, shardId);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, @Nullable String reason) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke index shard state changed callback", t, indexShard.shardId());
|
||||
logger.warn("[{}] failed to invoke after shard closed callback", t, shardId.getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShardInactive(IndexShard indexShard) {
|
||||
for (Listener listener : listeners) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.onShardInactive(indexShard);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke on shard inactive callback", t, indexShard.shardId());
|
||||
logger.warn("[{}] failed to invoke on shard inactive callback", t, indexShard.shardId().getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke index shard state changed callback", t, indexShard.shardId().getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexCreated(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke before index created callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexCreated(IndexService indexService) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexCreated(indexService);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke after index created callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexShardCreated(shardId, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke before shard created callback", t, shardId);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexClosed(IndexService indexService) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexClosed(indexService);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke before index closed callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexDeleted(indexService);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke before index deleted callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexDeleted(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke after index deleted callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexClosed(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke after index closed callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardDeleted(ShardId shardId,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexShardDeleted(shardId, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke before shard deleted callback", t, shardId.getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardDeleted(ShardId shardId,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexShardDeleted(shardId, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to invoke after shard deleted callback", t, shardId.getId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexAddedToCluster(index, indexSettings);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke before index added to cluster callback", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
|
@ -22,24 +22,61 @@ package org.elasticsearch.index;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.util.Providers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class IndexModule extends AbstractModule {
|
||||
|
||||
private final IndexMetaData indexMetaData;
|
||||
private final Settings settings;
|
||||
// pkg private so tests can mock
|
||||
Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
|
||||
Class<? extends IndexSearcherWrapper> indexSearcherWrapper = null;
|
||||
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
|
||||
private IndexEventListener listener;
|
||||
|
||||
public IndexModule(IndexMetaData indexMetaData) {
|
||||
|
||||
public IndexModule(Settings settings, IndexMetaData indexMetaData) {
|
||||
this.indexMetaData = indexMetaData;
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
public Settings getIndexSettings() {
|
||||
return settings;
|
||||
}
|
||||
|
||||
public void addIndexEventListener(IndexEventListener listener) {
|
||||
if (this.listener != null) {
|
||||
throw new IllegalStateException("can't add listener after listeners are frozen");
|
||||
}
|
||||
if (listener == null) {
|
||||
throw new IllegalArgumentException("listener must not be null");
|
||||
}
|
||||
if (indexEventListeners.contains(listener)) {
|
||||
throw new IllegalArgumentException("listener already added");
|
||||
}
|
||||
|
||||
this.indexEventListeners.add(listener);
|
||||
}
|
||||
|
||||
public IndexEventListener freeze() {
|
||||
// TODO somehow we need to make this pkg private...
|
||||
if (listener == null) {
|
||||
listener = new CompositeIndexEventListener(indexMetaData.getIndex(), settings, indexEventListeners);
|
||||
}
|
||||
return listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -50,12 +87,11 @@ public class IndexModule extends AbstractModule {
|
|||
} else {
|
||||
bind(IndexSearcherWrapper.class).to(indexSearcherWrapper).asEagerSingleton();
|
||||
}
|
||||
bind(IndexEventListener.class).toInstance(freeze());
|
||||
bind(IndexMetaData.class).toInstance(indexMetaData);
|
||||
bind(IndexService.class).asEagerSingleton();
|
||||
bind(IndexServicesProvider.class).asEagerSingleton();
|
||||
bind(MapperService.class).asEagerSingleton();
|
||||
bind(IndexFieldDataService.class).asEagerSingleton();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -46,28 +46,17 @@ import org.elasticsearch.index.mapper.MappedFieldType;
|
|||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShadowIndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.shard.*;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.AliasFilterParsingException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
||||
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||
import org.elasticsearch.indices.*;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -80,7 +69,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
|
|||
*/
|
||||
public class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable<IndexShard> {
|
||||
|
||||
private final InternalIndicesLifecycle indicesLifecycle;
|
||||
private final IndexEventListener eventListener;
|
||||
private final AnalysisService analysisService;
|
||||
private final IndexFieldDataService indexFieldData;
|
||||
private final BitsetFilterCache bitsetFilterCache;
|
||||
|
@ -102,7 +91,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
BitsetFilterCache bitSetFilterCache,
|
||||
IndicesService indicesServices,
|
||||
IndexServicesProvider indexServicesProvider,
|
||||
IndexStore indexStore) {
|
||||
IndexStore indexStore,
|
||||
IndexEventListener eventListener) {
|
||||
super(index, settingsService.indexSettings());
|
||||
assert indexMetaData != null;
|
||||
this.analysisService = analysisService;
|
||||
|
@ -110,7 +100,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
this.settingsService = settingsService;
|
||||
this.bitsetFilterCache = bitSetFilterCache;
|
||||
this.indicesServices = indicesServices;
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indexServicesProvider.getIndicesLifecycle();
|
||||
this.eventListener = eventListener;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.indexServicesProvider = indexServicesProvider;
|
||||
this.indexStore = indexStore;
|
||||
|
@ -123,8 +113,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
return shards.size();
|
||||
}
|
||||
|
||||
public InternalIndicesLifecycle indicesLifecycle() {
|
||||
return this.indicesLifecycle;
|
||||
public IndexEventListener getIndexEventListener() {
|
||||
return this.eventListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -225,7 +215,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized IndexShard createShard(int sShardId, ShardRouting routing) {
|
||||
public synchronized IndexShard createShard(int sShardId, ShardRouting routing) throws IOException {
|
||||
final boolean primary = routing.primary();
|
||||
/*
|
||||
* TODO: we execute this in parallel but it's a synced method. Yet, we might
|
||||
|
@ -237,13 +227,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
}
|
||||
final Settings indexSettings = settingsService.getSettings();
|
||||
final ShardId shardId = new ShardId(index, sShardId);
|
||||
ShardLock lock = null;
|
||||
boolean success = false;
|
||||
Store store = null;
|
||||
IndexShard indexShard = null;
|
||||
final ShardLock lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
|
||||
try {
|
||||
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
|
||||
indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);
|
||||
eventListener.beforeIndexShardCreated(shardId, indexSettings);
|
||||
ShardPath path;
|
||||
try {
|
||||
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
|
||||
|
@ -293,20 +282,16 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
indexShard = new IndexShard(shardId, indexSettings, path, store, indexServicesProvider);
|
||||
}
|
||||
|
||||
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
|
||||
indicesLifecycle.afterIndexShardCreated(indexShard);
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
settingsService.addListener(indexShard);
|
||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||
success = true;
|
||||
return indexShard;
|
||||
} catch (IOException e) {
|
||||
ElasticsearchException ex = new ElasticsearchException("failed to create shard", e);
|
||||
ex.setShard(shardId);
|
||||
throw ex;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(lock);
|
||||
closeShard("initialization failed", shardId, indexShard, store);
|
||||
closeShard("initialization failed", shardId, indexShard, store, eventListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -325,16 +310,16 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
HashMap<Integer, IndexShard> newShards = new HashMap<>(shards);
|
||||
indexShard = newShards.remove(shardId);
|
||||
shards = unmodifiableMap(newShards);
|
||||
closeShard(reason, sId, indexShard, indexShard.store());
|
||||
closeShard(reason, sId, indexShard, indexShard.store(), indexShard.getIndexEventListener());
|
||||
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
|
||||
}
|
||||
|
||||
private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store) {
|
||||
private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener) {
|
||||
final int shardId = sId.id();
|
||||
final Settings indexSettings = settingsService.getSettings();
|
||||
try {
|
||||
try {
|
||||
indicesLifecycle.beforeIndexShardClosed(sId, indexShard, indexSettings);
|
||||
listener.beforeIndexShardClosed(sId, indexShard, indexSettings);
|
||||
} finally {
|
||||
// this logic is tricky, we want to close the engine so we rollback the changes done to it
|
||||
// and close the shard so no operations are allowed to it
|
||||
|
@ -349,7 +334,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
}
|
||||
}
|
||||
// call this before we close the store, so we can release resources for it
|
||||
indicesLifecycle.afterIndexShardClosed(sId, indexShard, indexSettings);
|
||||
listener.afterIndexShardClosed(sId, indexShard, indexSettings);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
|
@ -367,10 +352,10 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
try {
|
||||
if (ownsShard) {
|
||||
try {
|
||||
indicesLifecycle.beforeIndexShardDeleted(lock.getShardId(), indexSettings);
|
||||
eventListener.beforeIndexShardDeleted(lock.getShardId(), indexSettings);
|
||||
} finally {
|
||||
indicesServices.deleteShardStore("delete index", lock, indexSettings);
|
||||
indicesLifecycle.afterIndexShardDeleted(lock.getShardId(), indexSettings);
|
||||
eventListener.afterIndexShardDeleted(lock.getShardId(), indexSettings);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -28,10 +28,10 @@ import org.elasticsearch.index.engine.EngineFactory;
|
|||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.termvectors.TermVectorsService;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||
|
@ -44,7 +44,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
*/
|
||||
public final class IndexServicesProvider {
|
||||
|
||||
private final IndicesLifecycle indicesLifecycle;
|
||||
private final ThreadPool threadPool;
|
||||
private final MapperService mapperService;
|
||||
private final IndexQueryParserService queryParserService;
|
||||
|
@ -59,10 +58,11 @@ public final class IndexServicesProvider {
|
|||
private final BigArrays bigArrays;
|
||||
private final IndexSearcherWrapper indexSearcherWrapper;
|
||||
private final IndexingMemoryController indexingMemoryController;
|
||||
private final IndexEventListener listener;
|
||||
|
||||
@Inject
|
||||
public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) {
|
||||
this.indicesLifecycle = indicesLifecycle;
|
||||
public IndexServicesProvider(IndexEventListener listener, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) {
|
||||
this.listener = listener;
|
||||
this.threadPool = threadPool;
|
||||
this.mapperService = mapperService;
|
||||
this.queryParserService = queryParserService;
|
||||
|
@ -79,10 +79,9 @@ public final class IndexServicesProvider {
|
|||
this.indexingMemoryController = indexingMemoryController;
|
||||
}
|
||||
|
||||
public IndicesLifecycle getIndicesLifecycle() {
|
||||
return indicesLifecycle;
|
||||
public IndexEventListener getIndexEventListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
public ThreadPool getThreadPool() {
|
||||
return threadPool;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
|
||||
/**
|
||||
* An index event listener is the primary extension point for plugins and build-in services
|
||||
* to react / listen to per-index and per-shard events. These listeners are registered per-index
|
||||
* via {@link org.elasticsearch.index.IndexModule#addIndexEventListener(IndexEventListener)}. All listeners have the same
|
||||
* lifecycle as the {@link IndexService} they are created for.
|
||||
* <p>
|
||||
* An IndexEventListener can be used across multiple indices and shards since all callback methods receive sufficient
|
||||
* local state via their arguments. Yet, if an instance is shared across indices they might be called concurrently and should not
|
||||
* modify local state without sufficient synchronization.
|
||||
* </p>
|
||||
*/
|
||||
public interface IndexEventListener {
|
||||
|
||||
/**
|
||||
* Called when the shard routing has changed state.
|
||||
*
|
||||
* @param indexShard The index shard
|
||||
* @param oldRouting The old routing state (can be null)
|
||||
* @param newRouting The new routing state
|
||||
*/
|
||||
default void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been created.
|
||||
*/
|
||||
default void afterIndexShardCreated(IndexShard indexShard) {}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been started.
|
||||
*/
|
||||
default void afterIndexShardStarted(IndexShard indexShard) {}
|
||||
|
||||
/**
|
||||
* Called before the index shard gets closed.
|
||||
*
|
||||
* @param indexShard The index shard
|
||||
*/
|
||||
default void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been closed.
|
||||
*
|
||||
* @param shardId The shard id
|
||||
*/
|
||||
default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {}
|
||||
|
||||
|
||||
/**
|
||||
* Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
|
||||
* The order of concurrent events is preserved. The execution must be lightweight.
|
||||
*
|
||||
* @param indexShard the shard the new state was applied to
|
||||
* @param previousState the previous index shard state if there was one, null otherwise
|
||||
* @param currentState the new shard state
|
||||
* @param reason the reason for the state change if there is one, null otherwise
|
||||
*/
|
||||
default void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {}
|
||||
|
||||
/**
|
||||
* Called when a shard is marked as inactive
|
||||
*
|
||||
* @param indexShard The shard that was marked inactive
|
||||
*/
|
||||
default void onShardInactive(IndexShard indexShard) {}
|
||||
|
||||
/**
|
||||
* Called before the index gets created. Note that this is also called
|
||||
* when the index is created on data nodes
|
||||
*/
|
||||
default void beforeIndexCreated(Index index, Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index has been created.
|
||||
*/
|
||||
default void afterIndexCreated(IndexService indexService) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index shard gets created.
|
||||
*/
|
||||
default void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Called before the index get closed.
|
||||
*
|
||||
* @param indexService The index service
|
||||
*/
|
||||
default void beforeIndexClosed(IndexService indexService) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index has been closed.
|
||||
*
|
||||
* @param index The index
|
||||
*/
|
||||
default void afterIndexClosed(Index index, Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index shard gets deleted from disk
|
||||
* Note: this method is only executed on the first attempt of deleting the shard. Retries are will not invoke
|
||||
* this method.
|
||||
* @param shardId The shard id
|
||||
* @param indexSettings the shards index settings
|
||||
*/
|
||||
default void beforeIndexShardDeleted(ShardId shardId, Settings indexSettings) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been deleted from disk.
|
||||
*
|
||||
* Note: this method is only called if the deletion of the shard did finish without an exception
|
||||
*
|
||||
* @param shardId The shard id
|
||||
* @param indexSettings the shards index settings
|
||||
*/
|
||||
default void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index has been deleted.
|
||||
* This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index, org.elasticsearch.common.settings.Settings)}
|
||||
* when an index is deleted
|
||||
*
|
||||
* @param index The index
|
||||
*/
|
||||
default void afterIndexDeleted(Index index, Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index gets deleted.
|
||||
* This listener method is invoked after
|
||||
* {@link #beforeIndexClosed(org.elasticsearch.index.IndexService)} when an index is deleted
|
||||
*
|
||||
* @param indexService The index service
|
||||
*/
|
||||
default void beforeIndexDeleted(IndexService indexService) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called on the Master node only before the {@link IndexService} instances is created to simulate an index creation.
|
||||
* This happens right before the index and it's metadata is registered in the cluster state
|
||||
*/
|
||||
default void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
|
||||
}
|
||||
}
|
|
@ -98,7 +98,6 @@ import org.elasticsearch.index.translog.TranslogWriter;
|
|||
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
|
||||
import org.elasticsearch.index.warmer.WarmerStats;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
|
@ -125,7 +124,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
private final ThreadPool threadPool;
|
||||
private final MapperService mapperService;
|
||||
private final IndexCache indexCache;
|
||||
private final InternalIndicesLifecycle indicesLifecycle;
|
||||
private final Store store;
|
||||
private final MergeSchedulerConfig mergeSchedulerConfig;
|
||||
private final ShardIndexingService indexingService;
|
||||
|
@ -149,6 +147,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
private final TranslogConfig translogConfig;
|
||||
private final MergePolicyConfig mergePolicyConfig;
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
private final IndexEventListener indexEventListener;
|
||||
|
||||
private TimeValue refreshInterval;
|
||||
|
||||
|
@ -206,8 +205,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
this.similarityService = provider.getSimilarityService();
|
||||
Objects.requireNonNull(store, "Store must be provided to the index shard");
|
||||
this.engineFactory = provider.getFactory();
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) provider.getIndicesLifecycle();
|
||||
this.store = store;
|
||||
this.indexEventListener = provider.getIndexEventListener();
|
||||
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
|
||||
this.threadPool = provider.getThreadPool();
|
||||
this.mapperService = provider.getMapperService();
|
||||
|
@ -367,12 +366,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
}
|
||||
}
|
||||
if (movedToStarted) {
|
||||
indicesLifecycle.afterIndexShardStarted(this);
|
||||
indexEventListener.afterIndexShardStarted(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.shardRouting = newRouting;
|
||||
indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting);
|
||||
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
|
||||
} finally {
|
||||
if (persistState) {
|
||||
persistMetadata(newRouting, currentRouting);
|
||||
|
@ -431,7 +430,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason);
|
||||
IndexShardState previousState = state;
|
||||
state = newState;
|
||||
this.indicesLifecycle.indexShardStateChanged(this, previousState, reason);
|
||||
this.indexEventListener.indexShardStateChanged(this, previousState, newState, reason);
|
||||
return previousState;
|
||||
}
|
||||
|
||||
|
@ -1036,7 +1035,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
if (wasActive) {
|
||||
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||
logger.debug("shard is now inactive");
|
||||
indicesLifecycle.onShardInactive(this);
|
||||
indexEventListener.onShardInactive(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1230,6 +1229,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
return percolatorQueriesRegistry.stats();
|
||||
}
|
||||
|
||||
public IndexEventListener getIndexEventListener() {
|
||||
return indexEventListener;
|
||||
}
|
||||
|
||||
class EngineRefresher implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -1,211 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
/**
|
||||
* A global component allowing to register for lifecycle of an index (create/closed) and
|
||||
* an index shard (created/closed).
|
||||
*/
|
||||
public interface IndicesLifecycle {
|
||||
|
||||
/**
|
||||
* Add a listener.
|
||||
*/
|
||||
void addListener(Listener listener);
|
||||
|
||||
/**
|
||||
* Remove a listener.
|
||||
*/
|
||||
void removeListener(Listener listener);
|
||||
|
||||
/**
|
||||
* A listener for index and index shard lifecycle events (create/closed).
|
||||
*/
|
||||
public abstract static class Listener {
|
||||
|
||||
/**
|
||||
* Called when the shard routing has changed state.
|
||||
*
|
||||
* @param indexShard The index shard
|
||||
* @param oldRouting The old routing state (can be null)
|
||||
* @param newRouting The new routing state
|
||||
*/
|
||||
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called on the Master node only before the index is created
|
||||
*/
|
||||
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index gets created. Note that this is also called
|
||||
* when the index is created on data nodes
|
||||
*/
|
||||
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index has been created.
|
||||
*/
|
||||
public void afterIndexCreated(IndexService indexService) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index shard gets created.
|
||||
*/
|
||||
public void beforeIndexShardCreated(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been created.
|
||||
*/
|
||||
public void afterIndexShardCreated(IndexShard indexShard) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been started.
|
||||
*/
|
||||
public void afterIndexShardStarted(IndexShard indexShard) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index get closed.
|
||||
*
|
||||
* @param indexService The index service
|
||||
*/
|
||||
public void beforeIndexClosed(IndexService indexService) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index has been closed.
|
||||
*
|
||||
* @param index The index
|
||||
*/
|
||||
public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index shard gets closed.
|
||||
*
|
||||
* @param indexShard The index shard
|
||||
*/
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been closed.
|
||||
*
|
||||
* @param shardId The shard id
|
||||
*/
|
||||
public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index shard gets deleted from disk
|
||||
* Note: this method is only executed on the first attempt of deleting the shard. Retries are will not invoke
|
||||
* this method.
|
||||
* @param shardId The shard id
|
||||
* @param indexSettings the shards index settings
|
||||
*/
|
||||
public void beforeIndexShardDeleted(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been deleted from disk.
|
||||
*
|
||||
* Note: this method is only called if the deletion of the shard did finish without an exception
|
||||
*
|
||||
* @param shardId The shard id
|
||||
* @param indexSettings the shards index settings
|
||||
*/
|
||||
public void afterIndexShardDeleted(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
|
||||
* The order of concurrent events is preserved. The execution must be lightweight.
|
||||
*
|
||||
* @param indexShard the shard the new state was applied to
|
||||
* @param previousState the previous index shard state if there was one, null otherwise
|
||||
* @param currentState the new shard state
|
||||
* @param reason the reason for the state change if there is one, null otherwise
|
||||
*/
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the index has been deleted.
|
||||
* This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index, org.elasticsearch.common.settings.Settings)}
|
||||
* when an index is deleted
|
||||
*
|
||||
* @param index The index
|
||||
*/
|
||||
public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index gets deleted.
|
||||
* This listener method is invoked after
|
||||
* {@link #beforeIndexClosed(org.elasticsearch.index.IndexService)} when an index is deleted
|
||||
*
|
||||
* @param indexService The index service
|
||||
*/
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a shard is marked as inactive
|
||||
*
|
||||
* @param indexShard The shard that was marked inactive
|
||||
*/
|
||||
public void onShardInactive(IndexShard indexShard) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -52,15 +52,13 @@ import org.elasticsearch.indices.ttl.IndicesTTLService;
|
|||
*/
|
||||
public class IndicesModule extends AbstractModule {
|
||||
|
||||
private final Settings settings;
|
||||
|
||||
private final ExtensionPoint.ClassSet<QueryParser> queryParsers
|
||||
= new ExtensionPoint.ClassSet<>("query_parser", QueryParser.class);
|
||||
private final ExtensionPoint.InstanceMap<String, Dictionary> hunspellDictionaries
|
||||
= new ExtensionPoint.InstanceMap<>("hunspell_dictionary", String.class, Dictionary.class);
|
||||
|
||||
public IndicesModule(Settings settings) {
|
||||
this.settings = settings;
|
||||
public IndicesModule() {
|
||||
registerBuiltinQueryParsers();
|
||||
}
|
||||
|
||||
|
@ -130,7 +128,6 @@ public class IndicesModule extends AbstractModule {
|
|||
bindQueryParsersExtension();
|
||||
bindHunspellExtension();
|
||||
|
||||
bind(IndicesLifecycle.class).to(InternalIndicesLifecycle.class).asEagerSingleton();
|
||||
bind(IndicesService.class).asEagerSingleton();
|
||||
bind(RecoverySettings.class).asEagerSingleton();
|
||||
bind(RecoveryTarget.class).asEagerSingleton();
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.elasticsearch.index.settings.IndexSettings;
|
|||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
|
@ -86,10 +87,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -106,9 +104,6 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
|
|||
public class IndicesService extends AbstractLifecycleComponent<IndicesService> implements Iterable<IndexService> {
|
||||
|
||||
public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
|
||||
|
||||
private final InternalIndicesLifecycle indicesLifecycle;
|
||||
|
||||
private final IndicesAnalysisService indicesAnalysisService;
|
||||
|
||||
private final Injector injector;
|
||||
|
@ -142,13 +137,11 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
private final OldShardsStats oldShardsStats = new OldShardsStats();
|
||||
|
||||
@Inject
|
||||
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv) {
|
||||
public IndicesService(Settings settings, IndicesAnalysisService indicesAnalysisService, Injector injector, PluginsService pluginsService, NodeEnvironment nodeEnv) {
|
||||
super(settings);
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
||||
this.indicesAnalysisService = indicesAnalysisService;
|
||||
this.injector = injector;
|
||||
this.pluginsService = injector.getInstance(PluginsService.class);
|
||||
this.indicesLifecycle.addListener(oldShardsStats);
|
||||
this.pluginsService = pluginsService;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
|
||||
}
|
||||
|
@ -195,10 +188,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
indicesAnalysisService);
|
||||
}
|
||||
|
||||
public IndicesLifecycle indicesLifecycle() {
|
||||
return this.indicesLifecycle;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the node stats indices stats. The <tt>includePrevious</tt> flag controls
|
||||
* if old shards stats will be aggregated as well (only for relevant stats, such as
|
||||
|
@ -305,7 +294,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
return indexService;
|
||||
}
|
||||
|
||||
public synchronized IndexService createIndex(IndexMetaData indexMetaData) {
|
||||
|
||||
/**
|
||||
* Creates a new {@link IndexService} for the given metadata.
|
||||
* @param indexMetaData the index metadata to create the index for
|
||||
* @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners
|
||||
* @throws IndexAlreadyExistsException if the index already exists.
|
||||
*/
|
||||
public synchronized IndexService createIndex(IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) {
|
||||
if (!lifecycle.started()) {
|
||||
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
|
||||
}
|
||||
|
@ -314,9 +310,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
if (indices.containsKey(index.name())) {
|
||||
throw new IndexAlreadyExistsException(index);
|
||||
}
|
||||
|
||||
indicesLifecycle.beforeIndexCreated(index, settings);
|
||||
|
||||
logger.debug("creating Index [{}], shards [{}]/[{}{}]",
|
||||
indexMetaData.getIndex(),
|
||||
settings.get(SETTING_NUMBER_OF_SHARDS),
|
||||
|
@ -335,12 +328,19 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
for (Module pluginModule : pluginsService.indexModules(indexSettings)) {
|
||||
modules.add(pluginModule);
|
||||
}
|
||||
final IndexModule indexModule = new IndexModule(settings, indexMetaData);
|
||||
for (IndexEventListener listener : builtInListeners) {
|
||||
indexModule.addIndexEventListener(listener);
|
||||
}
|
||||
indexModule.addIndexEventListener(oldShardsStats);
|
||||
modules.add(new IndexStoreModule(indexSettings));
|
||||
modules.add(new AnalysisModule(indexSettings, indicesAnalysisService));
|
||||
modules.add(new SimilarityModule(index, indexSettings));
|
||||
modules.add(new IndexCacheModule(indexSettings));
|
||||
modules.add(new IndexModule(indexMetaData));
|
||||
modules.add(indexModule);
|
||||
pluginsService.processModules(modules);
|
||||
final IndexEventListener listener = indexModule.freeze();
|
||||
listener.beforeIndexCreated(index, settings);
|
||||
|
||||
Injector indexInjector;
|
||||
try {
|
||||
|
@ -352,9 +352,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
}
|
||||
|
||||
IndexService indexService = indexInjector.getInstance(IndexService.class);
|
||||
|
||||
indicesLifecycle.afterIndexCreated(indexService);
|
||||
|
||||
assert indexService.getIndexEventListener() == listener;
|
||||
listener.afterIndexCreated(indexService);
|
||||
indices = newMapBuilder(indices).put(index.name(), new IndexServiceInjectorPair(indexService, indexInjector)).immutableMap();
|
||||
return indexService;
|
||||
}
|
||||
|
@ -373,6 +372,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
try {
|
||||
final IndexService indexService;
|
||||
final Injector indexInjector;
|
||||
final IndexEventListener listener;
|
||||
synchronized (this) {
|
||||
if (indices.containsKey(index) == false) {
|
||||
return;
|
||||
|
@ -384,11 +384,12 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
indexService = remove.getIndexService();
|
||||
indexInjector = remove.getInjector();
|
||||
indices = unmodifiableMap(newIndices);
|
||||
listener = indexService.getIndexEventListener();
|
||||
}
|
||||
|
||||
indicesLifecycle.beforeIndexClosed(indexService);
|
||||
listener.beforeIndexClosed(indexService);
|
||||
if (delete) {
|
||||
indicesLifecycle.beforeIndexDeleted(indexService);
|
||||
listener.beforeIndexDeleted(indexService);
|
||||
}
|
||||
Stream<Closeable> closeables = pluginsService.indexServices().stream().map(p -> indexInjector.getInstance(p));
|
||||
IOUtils.close(closeables::iterator);
|
||||
|
@ -412,10 +413,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
indexInjector.getInstance(IndexStore.class).close();
|
||||
|
||||
logger.debug("[{}] closed... (reason [{}])", index, reason);
|
||||
indicesLifecycle.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings());
|
||||
listener.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings());
|
||||
if (delete) {
|
||||
final Settings indexSettings = indexService.getIndexSettings();
|
||||
indicesLifecycle.afterIndexDeleted(indexService.index(), indexSettings);
|
||||
listener.afterIndexDeleted(indexService.index(), indexSettings);
|
||||
// now we are done - try to wipe data on disk if possible
|
||||
deleteIndexStore(reason, indexService.index(), indexSettings, false);
|
||||
}
|
||||
|
@ -424,7 +425,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
}
|
||||
}
|
||||
|
||||
static class OldShardsStats extends IndicesLifecycle.Listener {
|
||||
static class OldShardsStats implements IndexEventListener {
|
||||
|
||||
final SearchStats searchStats = new SearchStats();
|
||||
final GetStats getStats = new GetStats();
|
||||
|
|
|
@ -51,17 +51,17 @@ import org.elasticsearch.index.settings.IndexSettingsService;
|
|||
import org.elasticsearch.index.shard.*;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.flush.SyncedFlushService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
import org.elasticsearch.indices.recovery.RecoverySource;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
|
@ -101,14 +101,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
private final FailedShardHandler failedShardHandler = new FailedShardHandler();
|
||||
|
||||
private final boolean sendRefreshMapping;
|
||||
private final List<IndexEventListener> buildInIndexListener;
|
||||
|
||||
@Inject
|
||||
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
|
||||
ThreadPool threadPool, RecoveryTarget recoveryTarget,
|
||||
ShardStateAction shardStateAction,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction,
|
||||
NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService) {
|
||||
NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService, SearchService searchService, SyncedFlushService syncedFlushService, RecoverySource recoverySource) {
|
||||
super(settings);
|
||||
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService);
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -299,7 +301,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
logger.debug("[{}] creating index", indexMetaData.getIndex());
|
||||
}
|
||||
try {
|
||||
indicesService.createIndex(indexMetaData);
|
||||
indicesService.createIndex(indexMetaData, buildInIndexListener);
|
||||
} catch (Throwable e) {
|
||||
sendFailShard(shard, indexMetaData.getIndexUUID(), "failed to create index", e);
|
||||
}
|
||||
|
|
|
@ -41,11 +41,11 @@ import org.elasticsearch.common.util.concurrent.CountDown;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
|
@ -63,7 +63,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class SyncedFlushService extends AbstractComponent {
|
||||
public class SyncedFlushService extends AbstractComponent implements IndexEventListener {
|
||||
|
||||
private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
|
||||
private static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
|
||||
|
@ -85,25 +85,24 @@ public class SyncedFlushService extends AbstractComponent {
|
|||
transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler());
|
||||
transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, SyncedFlushRequest::new, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler());
|
||||
transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler());
|
||||
indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
|
||||
@Override
|
||||
public void onShardInactive(final IndexShard indexShard) {
|
||||
// we only want to call sync flush once, so only trigger it when we are on a primary
|
||||
if (indexShard.routingEntry().primary()) {
|
||||
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
|
||||
@Override
|
||||
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
||||
logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
logger.debug("{} sync flush on inactive shard failed", e, indexShard.shardId());
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onShardInactive(final IndexShard indexShard) {
|
||||
// we only want to call sync flush once, so only trigger it when we are on a primary
|
||||
if (indexShard.routingEntry().primary()) {
|
||||
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
|
||||
@Override
|
||||
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
||||
logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
logger.debug("{} sync flush on inactive shard failed", e, indexShard.shardId());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,9 +30,9 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -50,7 +50,7 @@ import java.util.Set;
|
|||
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
|
||||
* source shard to the target shard.
|
||||
*/
|
||||
public class RecoverySource extends AbstractComponent {
|
||||
public class RecoverySource extends AbstractComponent implements IndexEventListener{
|
||||
|
||||
public static class Actions {
|
||||
public static final String START_RECOVERY = "internal:index/shard/recovery/start_recovery";
|
||||
|
@ -72,21 +72,18 @@ public class RecoverySource extends AbstractComponent {
|
|||
this.transportService = transportService;
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null) {
|
||||
ongoingRecoveries.cancel(indexShard, "shard is closed");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.recoverySettings = recoverySettings;
|
||||
|
||||
transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, new StartRecoveryTransportRequestHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null) {
|
||||
ongoingRecoveries.cancel(indexShard, "shard is closed");
|
||||
}
|
||||
}
|
||||
|
||||
private RecoveryResponse recover(final StartRecoveryRequest request) {
|
||||
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().index().name());
|
||||
final IndexShard shard = indexService.getShard(request.shardId().id());
|
||||
|
|
|
@ -48,7 +48,7 @@ import org.elasticsearch.index.mapper.MapperException;
|
|||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.*;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
|
@ -67,7 +67,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
|||
* Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and
|
||||
* not several of them (since we don't allocate several shard replicas to the same node).
|
||||
*/
|
||||
public class RecoveryTarget extends AbstractComponent {
|
||||
public class RecoveryTarget extends AbstractComponent implements IndexEventListener {
|
||||
|
||||
public static class Actions {
|
||||
public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo";
|
||||
|
@ -88,8 +88,7 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
private final RecoveriesCollection onGoingRecoveries;
|
||||
|
||||
@Inject
|
||||
public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
IndicesLifecycle indicesLifecycle, RecoverySettings recoverySettings, ClusterService clusterService) {
|
||||
public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
|
@ -103,16 +102,14 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
|
||||
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new TranslogOperationsRequestHandler());
|
||||
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new FinalizeRecoveryRequestHandler());
|
||||
}
|
||||
|
||||
indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null) {
|
||||
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
|
||||
}
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null) {
|
||||
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -181,7 +181,7 @@ public class Node implements Releasable {
|
|||
if (settings.getAsBoolean(HTTP_ENABLED, true)) {
|
||||
modules.add(new HttpServerModule(settings));
|
||||
}
|
||||
modules.add(new IndicesModule(settings));
|
||||
modules.add(new IndicesModule());
|
||||
modules.add(new SearchModule(settings));
|
||||
modules.add(new ActionModule(false));
|
||||
modules.add(new MonitorModule(settings));
|
||||
|
|
|
@ -65,8 +65,8 @@ import org.elasticsearch.index.query.QueryParseContext;
|
|||
import org.elasticsearch.index.search.stats.ShardSearchStats;
|
||||
import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
|
||||
|
@ -119,7 +119,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
||||
public class SearchService extends AbstractLifecycleComponent<SearchService> implements IndexEventListener {
|
||||
|
||||
public static final String NORMS_LOADING_KEY = "index.norms.loading";
|
||||
public static final String DEFAULT_KEEPALIVE_KEY = "search.default_keep_alive";
|
||||
|
@ -173,27 +173,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
|
||||
@Override
|
||||
public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
|
||||
// once an index is closed we can just clean up all the pending search context information
|
||||
// to release memory and let references to the filesystem go etc.
|
||||
IndexMetaData idxMeta = SearchService.this.clusterService.state().metaData().index(index.getName());
|
||||
if (idxMeta != null && idxMeta.getState() == IndexMetaData.State.CLOSE) {
|
||||
// we need to check if it's really closed
|
||||
// since sometimes due to a relocation we already closed the shard and that causes the index to be closed
|
||||
// if we then close all the contexts we can get some search failures along the way which are not expected.
|
||||
// it's fine to keep the contexts open if the index is still "alive"
|
||||
// unfortunately we don't have a clear way to signal today why an index is closed.
|
||||
afterIndexDeleted(index, indexSettings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
|
||||
freeAllContextForIndex(index);
|
||||
}
|
||||
});
|
||||
this.indicesWarmer = indicesWarmer;
|
||||
this.scriptService = scriptService;
|
||||
this.pageCacheRecycler = pageCacheRecycler;
|
||||
|
@ -235,6 +214,26 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
|
||||
// once an index is closed we can just clean up all the pending search context information
|
||||
// to release memory and let references to the filesystem go etc.
|
||||
IndexMetaData idxMeta = SearchService.this.clusterService.state().metaData().index(index.getName());
|
||||
if (idxMeta != null && idxMeta.getState() == IndexMetaData.State.CLOSE) {
|
||||
// we need to check if it's really closed
|
||||
// since sometimes due to a relocation we already closed the shard and that causes the index to be closed
|
||||
// if we then close all the contexts we can get some search failures along the way which are not expected.
|
||||
// it's fine to keep the contexts open if the index is still "alive"
|
||||
// unfortunately we don't have a clear way to signal today why an index is closed.
|
||||
afterIndexDeleted(index, indexSettings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
|
||||
freeAllContextForIndex(index);
|
||||
}
|
||||
|
||||
protected void putContext(SearchContext context) {
|
||||
final SearchContext previous = activeContexts.put(context.id(), context);
|
||||
assert previous == null;
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.store.MockFSIndexStore;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -148,7 +148,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
|
|||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
assertAcked(prepareCreate(index).setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "5")
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false)
|
||||
));
|
||||
indexRandomData(index);
|
||||
ensureGreen(index);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
|
|||
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.store.MockFSIndexStore;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
|
@ -323,7 +324,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
public void testReusePeerRecovery() throws Exception {
|
||||
final Settings settings = settingsBuilder()
|
||||
.put("action.admin.cluster.node.shutdown.delay", "10ms")
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put("gateway.recover_after_nodes", 4)
|
||||
|
||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4)
|
||||
|
|
|
@ -27,31 +27,46 @@ import org.elasticsearch.index.engine.EngineConfig;
|
|||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.test.engine.MockEngineFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class IndexModuleTests extends ModuleTestCase {
|
||||
|
||||
public void testWrapperIsBound() {
|
||||
IndexModule module = new IndexModule(IndexMetaData.PROTO);
|
||||
IndexModule module = new IndexModule(Settings.EMPTY, IndexMetaData.PROTO);
|
||||
assertInstanceBinding(module, IndexSearcherWrapper.class,(x) -> x == null);
|
||||
module.indexSearcherWrapper = Wrapper.class;
|
||||
assertBinding(module, IndexSearcherWrapper.class, Wrapper.class);
|
||||
}
|
||||
|
||||
public void testEngineFactoryBound() {
|
||||
IndexModule module = new IndexModule(IndexMetaData.PROTO);
|
||||
IndexModule module = new IndexModule(Settings.EMPTY,IndexMetaData.PROTO);
|
||||
assertBinding(module, EngineFactory.class, InternalEngineFactory.class);
|
||||
module.engineFactoryImpl = MockEngineFactory.class;
|
||||
assertBinding(module, EngineFactory.class, MockEngineFactory.class);
|
||||
}
|
||||
|
||||
public void testOtherServiceBound() {
|
||||
final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
|
||||
final IndexEventListener listener = new IndexEventListener() {
|
||||
@Override
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
atomicBoolean.set(true);
|
||||
}
|
||||
};
|
||||
final IndexMetaData meta = IndexMetaData.builder(IndexMetaData.PROTO).index("foo").build();
|
||||
IndexModule module = new IndexModule(meta);
|
||||
IndexModule module = new IndexModule(Settings.EMPTY,meta);
|
||||
module.addIndexEventListener(listener);
|
||||
assertBinding(module, IndexService.class, IndexService.class);
|
||||
assertBinding(module, IndexServicesProvider.class, IndexServicesProvider.class);
|
||||
assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta);
|
||||
assertInstanceBinding(module, IndexEventListener.class, (x) -> {x.beforeIndexDeleted(null); return atomicBoolean.get();});
|
||||
}
|
||||
|
||||
public static final class Wrapper extends IndexSearcherWrapper {
|
||||
|
|
|
@ -52,7 +52,7 @@ public class AnalysisTestsHelper {
|
|||
if (settings.get(IndexMetaData.SETTING_VERSION_CREATED) == null) {
|
||||
settings = Settings.builder().put(settings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
}
|
||||
IndicesModule indicesModule = new IndicesModule(settings) {
|
||||
IndicesModule indicesModule = new IndicesModule() {
|
||||
@Override
|
||||
public void configure() {
|
||||
// skip services
|
||||
|
|
|
@ -173,7 +173,7 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
|
|||
new EnvironmentModule(new Environment(settings)),
|
||||
new SettingsModule(settings),
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new IndicesModule(settings) {
|
||||
new IndicesModule() {
|
||||
@Override
|
||||
public void configure() {
|
||||
// skip services
|
||||
|
|
|
@ -86,7 +86,7 @@ public class TemplateQueryParserTests extends ESTestCase {
|
|||
new EnvironmentModule(new Environment(settings)),
|
||||
new SettingsModule(settings),
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new IndicesModule(settings) {
|
||||
new IndicesModule() {
|
||||
@Override
|
||||
public void configure() {
|
||||
// skip services
|
||||
|
|
|
@ -763,7 +763,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertEquals(total + 1, shard.flushStats().getTotal());
|
||||
}
|
||||
|
||||
public void testRecoverFromStore() {
|
||||
public void testRecoverFromStore() throws IOException {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
|
@ -1039,7 +1039,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
shard.close("simon says", true);
|
||||
IndexServicesProvider indexServices = indexService.getIndexServices();
|
||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndexEventListener(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
|
|||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -52,12 +51,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.index.shard.*;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
|
@ -67,7 +61,8 @@ import org.elasticsearch.snapshots.SnapshotState;
|
|||
import org.elasticsearch.test.CorruptionUtils;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.MockIndexEventListener;
|
||||
import org.elasticsearch.test.store.MockFSIndexStore;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -125,7 +120,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(MockTransportService.TestPlugin.class);
|
||||
return pluginList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,7 +140,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
|
@ -194,7 +189,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
*/
|
||||
final CountDownLatch latch = new CountDownLatch(numShards * 3); // primary + 2 replicas
|
||||
final CopyOnWriteArrayList<Throwable> exception = new CopyOnWriteArrayList<>();
|
||||
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
|
||||
final IndexEventListener listener = new IndexEventListener() {
|
||||
@Override
|
||||
public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, @IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null) {
|
||||
|
@ -225,16 +220,16 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
for (IndicesService service : internalCluster().getDataNodeInstances(IndicesService.class)) {
|
||||
service.indicesLifecycle().addListener(listener);
|
||||
for (MockIndexEventListener.TestEventListener eventListener : internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) {
|
||||
eventListener.setNewDelegate(listener);
|
||||
}
|
||||
try {
|
||||
client().admin().indices().prepareDelete("test").get();
|
||||
latch.await();
|
||||
assertThat(exception, empty());
|
||||
} finally {
|
||||
for (IndicesService service : internalCluster().getDataNodeInstances(IndicesService.class)) {
|
||||
service.indicesLifecycle().removeListener(listener);
|
||||
for (MockIndexEventListener.TestEventListener eventListener : internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) {
|
||||
eventListener.setNewDelegate(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -250,7 +245,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
|
@ -395,7 +390,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast
|
||||
// This does corrupt files on the replica, so we can't check:
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
|
||||
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
|
||||
));
|
||||
|
@ -476,7 +471,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
|
@ -531,7 +526,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
|
|
|
@ -31,14 +31,19 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.MockIndexEventListener;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -63,6 +68,12 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(MockIndexEventListener.TestPlugin.class);
|
||||
}
|
||||
|
||||
public void testBeforeIndexAddedToCluster() throws Exception {
|
||||
String node1 = internalCluster().startNode();
|
||||
String node2 = internalCluster().startNode();
|
||||
|
@ -71,7 +82,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
|||
final AtomicInteger beforeAddedCount = new AtomicInteger(0);
|
||||
final AtomicInteger allCreatedCount = new AtomicInteger(0);
|
||||
|
||||
IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
|
||||
IndexEventListener listener = new IndexEventListener() {
|
||||
@Override
|
||||
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
|
||||
beforeAddedCount.incrementAndGet();
|
||||
|
@ -86,9 +97,9 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
internalCluster().getInstance(IndicesLifecycle.class, node1).addListener(listener);
|
||||
internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(listener);
|
||||
internalCluster().getInstance(IndicesLifecycle.class, node3).addListener(listener);
|
||||
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node1).setNewDelegate(listener);
|
||||
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(listener);
|
||||
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node3).setNewDelegate(listener);
|
||||
|
||||
client().admin().indices().prepareCreate("test")
|
||||
.setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
|
||||
|
@ -115,7 +126,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
|||
client().admin().indices().prepareCreate("index1").setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0).get();
|
||||
ensureGreen("index1");
|
||||
String node2 = internalCluster().startNode();
|
||||
internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(new IndexShardStateChangeListener() {
|
||||
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(new IndexShardStateChangeListener() {
|
||||
@Override
|
||||
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
|
||||
throw new RuntimeException("FAIL");
|
||||
|
@ -134,7 +145,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
|||
String node1 = internalCluster().startNode();
|
||||
IndexShardStateChangeListener stateChangeListenerNode1 = new IndexShardStateChangeListener();
|
||||
//add a listener that keeps track of the shard state changes
|
||||
internalCluster().getInstance(IndicesLifecycle.class, node1).addListener(stateChangeListenerNode1);
|
||||
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node1).setNewDelegate(stateChangeListenerNode1);
|
||||
|
||||
//create an index that should fail
|
||||
try {
|
||||
|
@ -165,7 +176,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
|||
String node2 = internalCluster().startNode();
|
||||
IndexShardStateChangeListener stateChangeListenerNode2 = new IndexShardStateChangeListener();
|
||||
//add a listener that keeps track of the shard state changes
|
||||
internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(stateChangeListenerNode2);
|
||||
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(stateChangeListenerNode2);
|
||||
//re-enable allocation
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
||||
.setPersistentSettings(builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")));
|
||||
|
@ -226,7 +237,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
|
|||
stateChangeListener.shardStates.clear();
|
||||
}
|
||||
|
||||
private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener {
|
||||
private static class IndexShardStateChangeListener implements IndexEventListener {
|
||||
//we keep track of all the states (ordered) a shard goes through
|
||||
final ConcurrentMap<ShardId, List<IndexShardState>> shardStates = new ConcurrentHashMap<>();
|
||||
Settings creationSettings = Settings.EMPTY;
|
||||
|
|
|
@ -18,13 +18,22 @@
|
|||
*/
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
|
@ -32,17 +41,17 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
||||
public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCase {
|
||||
@Override
|
||||
protected boolean resetNodeAfterTest() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void testCloseDeleteCallback() throws Throwable {
|
||||
final AtomicInteger counter = new AtomicInteger(1);
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
assertAcked(client().admin().indices().prepareCreate("test")
|
||||
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
ensureGreen();
|
||||
getInstanceFromNode(IndicesLifecycle.class).addListener(new IndicesLifecycle.Listener() {
|
||||
IndexMetaData metaData = indicesService.indexService("test").getMetaData();
|
||||
ShardRouting shardRouting = indicesService.indexService("test").getShard(0).routingEntry();
|
||||
assertAcked(client().admin().indices().prepareDelete("test").get());
|
||||
final AtomicInteger counter = new AtomicInteger(1);
|
||||
IndexEventListener countingListener = new IndexEventListener() {
|
||||
@Override
|
||||
public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
|
||||
assertEquals(counter.get(), 5);
|
||||
|
@ -62,7 +71,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
|
|||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
assertEquals(counter.get(), 2);
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
|
@ -78,8 +87,19 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
|
|||
assertEquals(counter.get(), 4);
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
});
|
||||
assertAcked(client().admin().indices().prepareDelete("test").get());
|
||||
};
|
||||
IndexService index = indicesService.createIndex(metaData, Arrays.asList(countingListener));
|
||||
ShardRouting newRouting = new ShardRouting(shardRouting);
|
||||
String nodeId = newRouting.currentNodeId();
|
||||
ShardRoutingHelper.moveToUnassigned(newRouting, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom"));
|
||||
ShardRoutingHelper.initialize(newRouting, nodeId);
|
||||
IndexShard shard = index.createShard(0, newRouting);
|
||||
shard.updateRoutingEntry(newRouting, true);
|
||||
shard.recoverFromStore(newRouting, new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT));
|
||||
newRouting = new ShardRouting(newRouting);
|
||||
ShardRoutingHelper.moveToStarted(newRouting);
|
||||
shard.updateRoutingEntry(newRouting, true);
|
||||
indicesService.deleteIndex("test", "simon says");
|
||||
assertEquals(7, counter.get());
|
||||
}
|
||||
|
||||
|
|
|
@ -48,13 +48,13 @@ public class IndicesModuleTests extends ModuleTestCase {
|
|||
}
|
||||
|
||||
public void testRegisterQueryParser() {
|
||||
IndicesModule module = new IndicesModule(Settings.EMPTY);
|
||||
IndicesModule module = new IndicesModule();
|
||||
module.registerQueryParser(FakeQueryParser.class);
|
||||
assertSetMultiBinding(module, QueryParser.class, FakeQueryParser.class);
|
||||
}
|
||||
|
||||
public void testRegisterQueryParserDuplicate() {
|
||||
IndicesModule module = new IndicesModule(Settings.EMPTY);
|
||||
IndicesModule module = new IndicesModule();
|
||||
try {
|
||||
module.registerQueryParser(TermQueryParser.class);
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
@ -63,7 +63,7 @@ public class IndicesModuleTests extends ModuleTestCase {
|
|||
}
|
||||
|
||||
public void testRegisterHunspellDictionary() throws Exception {
|
||||
IndicesModule module = new IndicesModule(Settings.EMPTY);
|
||||
IndicesModule module = new IndicesModule();
|
||||
InputStream aff = getClass().getResourceAsStream("/indices/analyze/conf_dir/hunspell/en_US/en_US.aff");
|
||||
InputStream dic = getClass().getResourceAsStream("/indices/analyze/conf_dir/hunspell/en_US/en_US.dic");
|
||||
Dictionary dictionary = new Dictionary(aff, dic);
|
||||
|
@ -72,7 +72,7 @@ public class IndicesModuleTests extends ModuleTestCase {
|
|||
}
|
||||
|
||||
public void testRegisterHunspellDictionaryDuplicate() {
|
||||
IndicesModule module = new IndicesModule(Settings.EMPTY);
|
||||
IndicesModule module = new IndicesModule();
|
||||
try {
|
||||
module.registerQueryParser(TermQueryParser.class);
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
|
|
@ -40,10 +40,10 @@ import org.elasticsearch.common.Priority;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -53,6 +53,7 @@ import org.elasticsearch.test.BackgroundIndexer;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.MockIndexEventListener;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
@ -89,9 +90,11 @@ import static org.hamcrest.Matchers.startsWith;
|
|||
public class RelocationIT extends ESIntegTestCase {
|
||||
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(MockTransportService.TestPlugin.class);
|
||||
return pluginList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class);
|
||||
}
|
||||
|
||||
public void testSimpleRelocationNoIndexing() {
|
||||
|
@ -282,16 +285,16 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
final Semaphore postRecoveryShards = new Semaphore(0);
|
||||
|
||||
for (IndicesLifecycle indicesLifecycle : internalCluster().getInstances(IndicesLifecycle.class)) {
|
||||
indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
|
||||
@Override
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
if (currentState == IndexShardState.POST_RECOVERY) {
|
||||
postRecoveryShards.release();
|
||||
}
|
||||
final IndexEventListener listener = new IndexEventListener() {
|
||||
@Override
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
if (currentState == IndexShardState.POST_RECOVERY) {
|
||||
postRecoveryShards.release();
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
for (MockIndexEventListener.TestEventListener eventListener : internalCluster().getInstances(MockIndexEventListener.TestEventListener.class)) {
|
||||
eventListener.setNewDelegate(listener);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.search.sort.SortOrder;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.store.MockFSIndexStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -103,7 +104,7 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase {
|
|||
} else {
|
||||
Settings.Builder settings = settingsBuilder()
|
||||
.put("index.number_of_replicas", randomIntBetween(0, 1))
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false)
|
||||
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
|
||||
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid
|
||||
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
|
||||
|
|
|
@ -81,7 +81,7 @@ public class SearchSourceBuilderTests extends ESTestCase {
|
|||
injector = new ModulesBuilder().add(
|
||||
new SettingsModule(settings),
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new IndicesModule(settings) {
|
||||
new IndicesModule() {
|
||||
@Override
|
||||
public void configure() {
|
||||
// skip services
|
||||
|
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* This is a testing plugin that registers a generic {@link org.elasticsearch.test.MockIndexEventListener.TestEventListener} as a node level service as well as a listener
|
||||
* on every index. Tests can access it like this:
|
||||
* <pre>
|
||||
* TestEventListener listener = internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node1);
|
||||
* listener.setNewDelegate(new IndexEventListener() {
|
||||
* // do some stuff
|
||||
* });
|
||||
* </pre>
|
||||
* This allows tests to use the listener without registering their own plugins.
|
||||
*/
|
||||
public final class MockIndexEventListener {
|
||||
|
||||
public static class TestPlugin extends Plugin {
|
||||
private final TestEventListener listener = new TestEventListener();
|
||||
@Override
|
||||
public String name() {
|
||||
return "mock-index-listener";
|
||||
}
|
||||
@Override
|
||||
public String description() {
|
||||
return "a mock index listener for testing only";
|
||||
}
|
||||
|
||||
public void onModule(IndexModule module) {
|
||||
module.addIndexEventListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Module> nodeModules() {
|
||||
return Collections.singleton(binder -> binder.bind(TestEventListener.class).toInstance(listener));
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestEventListener implements IndexEventListener {
|
||||
private volatile IndexEventListener delegate = new IndexEventListener() {};
|
||||
|
||||
public void setNewDelegate(IndexEventListener listener) {
|
||||
delegate = listener == null ? new IndexEventListener() {} : listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
|
||||
delegate.shardRoutingChanged(indexShard, oldRouting, newRouting);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardCreated(IndexShard indexShard) {
|
||||
delegate.afterIndexShardCreated(indexShard);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardStarted(IndexShard indexShard) {
|
||||
delegate.afterIndexShardStarted(indexShard);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
|
||||
delegate.beforeIndexShardClosed(shardId, indexShard, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
|
||||
delegate.afterIndexShardClosed(shardId, indexShard, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
delegate.indexShardStateChanged(indexShard, previousState, currentState, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShardInactive(IndexShard indexShard) {
|
||||
delegate.onShardInactive(indexShard);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexCreated(Index index, Settings indexSettings) {
|
||||
delegate.beforeIndexCreated(index, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexCreated(IndexService indexService) {
|
||||
delegate.afterIndexCreated(indexService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
|
||||
delegate.beforeIndexShardCreated(shardId, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexClosed(IndexService indexService) {
|
||||
delegate.beforeIndexClosed(indexService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexClosed(Index index, Settings indexSettings) {
|
||||
delegate.afterIndexClosed(index, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardDeleted(ShardId shardId, Settings indexSettings) {
|
||||
delegate.beforeIndexShardDeleted(shardId, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) {
|
||||
delegate.afterIndexShardDeleted(shardId, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexDeleted(Index index, Settings indexSettings) {
|
||||
delegate.afterIndexDeleted(index, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexDeleted(IndexService indexService) {
|
||||
delegate.beforeIndexDeleted(indexService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
|
||||
delegate.beforeIndexAddedToCluster(index, indexSettings);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,11 +27,11 @@ import org.apache.lucene.index.IndexWriter;
|
|||
import org.apache.lucene.store.*;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestRuleMarkFailure;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
@ -40,7 +40,6 @@ import org.elasticsearch.index.store.FsDirectoryService;
|
|||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreModule;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -54,18 +53,12 @@ import java.util.*;
|
|||
|
||||
public class MockFSDirectoryService extends FsDirectoryService {
|
||||
|
||||
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
|
||||
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "index.store.mock.random.io_exception_rate_on_open";
|
||||
public static final String RANDOM_PREVENT_DOUBLE_WRITE = "index.store.mock.random.prevent_double_write";
|
||||
public static final String RANDOM_NO_DELETE_OPEN_FILE = "index.store.mock.random.no_delete_open_file";
|
||||
public static final String CRASH_INDEX = "index.store.mock.random.crash_index";
|
||||
|
||||
private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
|
||||
IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY
|
||||
);
|
||||
|
||||
private final FsDirectoryService delegateService;
|
||||
private final boolean checkIndexOnClose;
|
||||
private final Random random;
|
||||
private final double randomIOExceptionRate;
|
||||
private final double randomIOExceptionRateOnOpen;
|
||||
|
@ -80,7 +73,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
|||
super(indexSettings, indexStore, path);
|
||||
final long seed = indexSettings.getAsLong(ESIntegTestCase.SETTING_INDEX_SEED, 0l);
|
||||
this.random = new Random(seed);
|
||||
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true);
|
||||
|
||||
randomIOExceptionRate = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE, 0.0d);
|
||||
randomIOExceptionRateOnOpen = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0.0d);
|
||||
preventDoubleWrite = indexSettings.getAsBoolean(RANDOM_PREVENT_DOUBLE_WRITE, true); // true is default in MDW
|
||||
|
@ -95,33 +88,6 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
|||
}
|
||||
this.indexSettings = indexSettings;
|
||||
delegateService = randomDirectorService(indexStore, path);
|
||||
if (checkIndexOnClose) {
|
||||
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
|
||||
|
||||
boolean canRun = false;
|
||||
|
||||
@Override
|
||||
public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (indexShard != null && shardId.equals(sid)) {
|
||||
if (validCheckIndexStates.contains(indexShard.state()) && IndexMetaData.isOnSharedFilesystem(indexSettings) == false) {
|
||||
canRun = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
|
||||
@IndexSettings Settings indexSettings) {
|
||||
if (shardId.equals(sid) && indexShard != null && canRun) {
|
||||
assert indexShard.state() == IndexShardState.CLOSED : "Current state must be closed";
|
||||
checkIndex(indexShard.store(), sid);
|
||||
}
|
||||
service.indicesLifecycle().removeListener(this);
|
||||
}
|
||||
};
|
||||
service.indicesLifecycle().addListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -135,7 +101,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void checkIndex(Store store, ShardId shardId) {
|
||||
public static void checkIndex(ESLogger logger, Store store, ShardId shardId) {
|
||||
if (store.tryIncRef()) {
|
||||
logger.info("start check index");
|
||||
try {
|
||||
|
|
|
@ -19,22 +19,29 @@
|
|||
|
||||
package org.elasticsearch.test.store;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.shard.*;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.FsDirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreModule;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.EnumSet;
|
||||
|
||||
public class MockFSIndexStore extends IndexStore {
|
||||
|
||||
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
|
||||
private final IndicesService indicesService;
|
||||
|
||||
public static class TestPlugin extends Plugin {
|
||||
|
@ -53,6 +60,15 @@ public class MockFSIndexStore extends IndexStore {
|
|||
public Settings additionalSettings() {
|
||||
return Settings.builder().put(IndexStoreModule.STORE_TYPE, "mock").build();
|
||||
}
|
||||
|
||||
public void onModule(IndexModule module) {
|
||||
Settings indexSettings = module.getIndexSettings();
|
||||
if ("mock".equals(indexSettings.get(IndexStoreModule.STORE_TYPE))) {
|
||||
if (indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true)) {
|
||||
module.addIndexEventListener(new Listener());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
|
@ -66,4 +82,18 @@ public class MockFSIndexStore extends IndexStore {
|
|||
return new MockFSDirectoryService(indexSettings, this, indicesService, path);
|
||||
}
|
||||
|
||||
private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
|
||||
IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY
|
||||
);
|
||||
private static final class Listener implements IndexEventListener {
|
||||
@Override
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
if (currentState == IndexShardState.CLOSED && validCheckIndexStates.contains(previousState) && IndexMetaData.isOnSharedFilesystem(indexShard.indexSettings()) == false) {
|
||||
ESLogger logger = Loggers.getLogger(getClass(), indexShard.indexSettings(), indexShard.shardId());
|
||||
MockFSDirectoryService.checkIndex(logger, indexShard.store(), indexShard.shardId());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue