OpenSearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java

452 lines
17 KiB
Java
Raw Normal View History

2010-02-08 15:30:06 +02:00
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
2010-02-08 15:30:06 +02:00
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.service;
2010-02-08 15:30:06 +02:00
2011-12-06 02:42:25 +02:00
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
2011-12-06 02:42:25 +02:00
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
2011-12-06 02:42:25 +02:00
import org.elasticsearch.index.*;
2011-05-21 19:53:17 -04:00
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.filter.ShardFilterCacheModule;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineModule;
import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldDataModule;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayModule;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.get.ShardGetModule;
import org.elasticsearch.index.indexing.ShardIndexingModule;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.PercolatorShardModule;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.stats.ShardSearchModule;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShardCreationException;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotModule;
import org.elasticsearch.index.store.IndexStore;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;
2014-03-27 11:33:23 +11:00
import org.elasticsearch.index.suggest.SuggestShardModule;
import org.elasticsearch.index.termvectors.ShardTermVectorModule;
import org.elasticsearch.index.translog.Translog;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.threadpool.ThreadPool;
2010-02-08 15:30:06 +02:00
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
2010-02-08 15:30:06 +02:00
2011-12-06 02:42:25 +02:00
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
2010-02-08 15:30:06 +02:00
/**
2011-12-06 02:42:25 +02:00
*
2010-02-08 15:30:06 +02:00
*/
public class InternalIndexService extends AbstractIndexComponent implements IndexService {
private final Injector injector;
private final Settings indexSettings;
private final ThreadPool threadPool;
private final PluginsService pluginsService;
private final InternalIndicesLifecycle indicesLifecycle;
private final AnalysisService analysisService;
2010-02-08 15:30:06 +02:00
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
private final SimilarityService similarityService;
2011-05-21 19:53:17 -04:00
private final IndexAliasesService aliasesService;
private final IndexCache indexCache;
2010-02-08 15:30:06 +02:00
private final IndexFieldDataService indexFieldData;
private final IndexEngine indexEngine;
private final IndexGateway indexGateway;
private final IndexStore indexStore;
private final IndexSettingsService settingsService;
2010-02-08 15:30:06 +02:00
private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of();
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
private volatile boolean closed = false;
2011-12-06 02:42:25 +02:00
@Inject
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine,
IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData) {
2010-02-08 15:30:06 +02:00
super(index, indexSettings);
this.injector = injector;
this.threadPool = threadPool;
2010-02-08 15:30:06 +02:00
this.indexSettings = indexSettings;
this.analysisService = analysisService;
2010-02-08 15:30:06 +02:00
this.mapperService = mapperService;
this.queryParserService = queryParserService;
this.similarityService = similarityService;
2011-05-21 19:53:17 -04:00
this.aliasesService = aliasesService;
this.indexCache = indexCache;
this.indexFieldData = indexFieldData;
this.indexEngine = indexEngine;
this.indexGateway = indexGateway;
this.indexStore = indexStore;
this.settingsService = settingsService;
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
// inject workarounds for cyclic dep
indexCache.filter().setIndexService(this);
indexFieldData.setIndexService(this);
2010-02-08 15:30:06 +02:00
}
2011-12-06 02:42:25 +02:00
@Override
public int numberOfShards() {
2010-02-08 15:30:06 +02:00
return shards.size();
}
2011-12-06 02:42:25 +02:00
@Override
public UnmodifiableIterator<IndexShard> iterator() {
2010-02-08 15:30:06 +02:00
return shards.values().iterator();
}
2011-12-06 02:42:25 +02:00
@Override
public boolean hasShard(int shardId) {
2010-02-08 15:30:06 +02:00
return shards.containsKey(shardId);
}
2011-12-06 02:42:25 +02:00
@Override
public IndexShard shard(int shardId) {
2010-02-08 15:30:06 +02:00
return shards.get(shardId);
}
2011-12-06 02:42:25 +02:00
@Override
public IndexShard shardSafe(int shardId) throws IndexShardMissingException {
2010-02-08 15:30:06 +02:00
IndexShard indexShard = shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
return indexShard;
}
2011-12-06 02:42:25 +02:00
@Override
public ImmutableSet<Integer> shardIds() {
return shards.keySet();
2010-02-08 15:30:06 +02:00
}
2011-12-06 02:42:25 +02:00
@Override
public Injector injector() {
2010-02-08 15:30:06 +02:00
return injector;
}
2011-12-06 02:42:25 +02:00
@Override
public IndexGateway gateway() {
return indexGateway;
}
@Override
public IndexSettingsService settingsService() {
return this.settingsService;
}
2011-12-06 02:42:25 +02:00
@Override
public IndexStore store() {
return indexStore;
}
2011-12-06 02:42:25 +02:00
@Override
public IndexCache cache() {
return indexCache;
2010-02-08 15:30:06 +02:00
}
@Override
public IndexFieldDataService fieldData() {
return indexFieldData;
}
2011-12-06 02:42:25 +02:00
@Override
public AnalysisService analysisService() {
return this.analysisService;
}
2011-12-06 02:42:25 +02:00
@Override
public MapperService mapperService() {
2010-02-08 15:30:06 +02:00
return mapperService;
}
2011-12-06 02:42:25 +02:00
@Override
public IndexQueryParserService queryParserService() {
2010-02-08 15:30:06 +02:00
return queryParserService;
}
2011-12-06 02:42:25 +02:00
@Override
public SimilarityService similarityService() {
2010-02-08 15:30:06 +02:00
return similarityService;
}
2011-12-06 02:42:25 +02:00
@Override
public IndexAliasesService aliasesService() {
2011-05-21 19:53:17 -04:00
return aliasesService;
}
2011-12-06 02:42:25 +02:00
@Override
public IndexEngine engine() {
return indexEngine;
}
public void close(final String reason, @Nullable Executor executor) {
synchronized (this) {
closed = true;
}
Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
for (final int shardId : shardIds) {
executor = executor == null ? threadPool.generic() : executor;
executor.execute(new Runnable() {
2011-12-06 02:42:25 +02:00
@Override
public void run() {
try {
removeShard(shardId, reason);
} catch (Throwable e) {
logger.warn("failed to close shard", e);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
logger.debug("Interrupted closing index [{}]", e, index().name());
Thread.currentThread().interrupt();
2010-02-08 15:30:06 +02:00
}
}
2011-12-06 02:42:25 +02:00
@Override
public Injector shardInjector(int shardId) throws ElasticsearchException {
2010-02-08 15:30:06 +02:00
return shardsInjectors.get(shardId);
}
2011-12-06 02:42:25 +02:00
@Override
public Injector shardInjectorSafe(int shardId) throws IndexShardMissingException {
2010-02-08 15:30:06 +02:00
Injector shardInjector = shardInjector(shardId);
if (shardInjector == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
return shardInjector;
}
@Override
public String indexUUID() {
return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
}
2011-12-06 02:42:25 +02:00
@Override
public synchronized IndexShard createShard(int sShardId) throws ElasticsearchException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
* keep it synced.
*/
if (closed) {
throw new ElasticsearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
}
2010-02-08 15:30:06 +02:00
ShardId shardId = new ShardId(index, sShardId);
if (shardsInjectors.containsKey(shardId.id())) {
throw new IndexShardAlreadyExistsException(shardId + " already exists");
}
indicesLifecycle.beforeIndexShardCreated(shardId);
logger.debug("creating shard_id [{}]", shardId.id());
2010-02-08 15:30:06 +02:00
ModulesBuilder modules = new ModulesBuilder();
2010-04-27 23:54:30 +03:00
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(indexSettings, shardId));
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class)));
2010-04-27 23:54:30 +03:00
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
modules.add(new ShardFilterCacheModule());
modules.add(new ShardFieldDataModule());
2010-04-27 23:54:30 +03:00
modules.add(new TranslogModule(indexSettings));
modules.add(new EngineModule(indexSettings));
modules.add(new IndexShardGatewayModule(injector.getInstance(IndexGateway.class)));
modules.add(new PercolatorShardModule());
modules.add(new ShardTermVectorModule());
modules.add(new IndexShardSnapshotModule());
2014-03-27 11:33:23 +11:00
modules.add(new SuggestShardModule());
2010-04-27 23:54:30 +03:00
Injector shardInjector;
try {
shardInjector = modules.createChildInjector(injector);
} catch (CreationException e) {
throw new IndexShardCreationException(shardId, Injectors.getFirstErrorFailure(e));
} catch (Throwable e) {
throw new IndexShardCreationException(shardId, e);
}
2010-02-08 15:30:06 +02:00
shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap();
IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard);
2010-02-08 15:30:06 +02:00
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
return indexShard;
}
2011-12-06 02:42:25 +02:00
@Override
public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException {
final Injector shardInjector;
final IndexShard indexShard;
final ShardId sId = new ShardId(index, shardId);
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
return;
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
indicesLifecycle.beforeIndexShardClosed(sId, indexShard);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) {
try {
shardInjector.getInstance(closeable).close();
} catch (Throwable e) {
logger.debug("failed to clean plugin shard service [{}]", e, closeable);
}
}
try {
// now we can close the translog service, we need to close it before the we close the shard
shardInjector.getInstance(TranslogService.class).close();
} catch (Throwable e) {
logger.debug("failed to close translog service", e);
// ignore
}
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
try {
((InternalIndexShard) indexShard).close(reason);
} catch (Throwable e) {
logger.debug("failed to close index shard", e);
// ignore
}
}
try {
shardInjector.getInstance(Engine.class).close();
} catch (Throwable e) {
logger.debug("failed to close engine", e);
// ignore
}
try {
shardInjector.getInstance(MergeSchedulerProvider.class).close();
} catch (Throwable e) {
logger.debug("failed to close merge policy scheduler", e);
// ignore
}
try {
shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Throwable e) {
logger.debug("failed to close merge policy provider", e);
// ignore
}
try {
shardInjector.getInstance(IndexShardGatewayService.class).close();
} catch (Throwable e) {
logger.debug("failed to close index shard gateway", e);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
} catch (Throwable e) {
logger.debug("failed to close translog", e);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(PercolatorQueriesRegistry.class).close();
} catch (Throwable e) {
logger.debug("failed to close PercolatorQueriesRegistry", e);
// ignore
}
2010-02-08 15:30:06 +02:00
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId);
// if we delete or have no gateway or the store is not persistent, clean the store...
2010-02-08 15:30:06 +02:00
Store store = shardInjector.getInstance(Store.class);
// and close it
2010-02-08 15:30:06 +02:00
try {
store.close();
} catch (Throwable e) {
logger.warn("failed to close store on shard deletion", e);
2010-02-08 15:30:06 +02:00
}
Injectors.close(injector);
}
}