Remove ClusterSerivce and IndexSettingsService dependency from IndexShard

We have two unneded heavy dependencies on IndexShard that are unneeded and only cause
trouble if you try to mock index shard. This commit removes IndexSettingsService as well as
ClusterSerivce from IndexShard to simplify future mocking and construction.
This commit is contained in:
Simon Willnauer 2015-09-29 17:42:55 +02:00
parent d715dfd16c
commit 6fff824402
9 changed files with 157 additions and 161 deletions

View File

@ -24,6 +24,7 @@ import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -382,6 +383,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap();
settingsService.addListener(indexShard);
success = true;
return indexShard;
} catch (IOException e) {
@ -433,6 +435,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
settingsService.removeListener(indexShard);
try {
final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted
indexShard.close(reason, flushEngine);
@ -453,18 +456,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
/**
* This method gets an instance for each of the given classes passed and calls #close() on the returned instance.
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
*/
private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable>... toClose) {
for (Class<? extends Closeable> closeable : toClose) {
if (closeInjectorOptionalResource(shardId, shardInjector, closeable) == false) {
logger.warn("[{}] no instance available for [{}], ignoring... ", shardId, closeable.getSimpleName());
}
}
}
/**
* Closes an optional resource. Returns true if the resource was found;
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log

View File

@ -73,6 +73,12 @@ public class IndexSettingsService extends AbstractIndexComponent {
this.listeners.remove(listener);
}
/**
* Returns <code>true</code> iff the given listener is already registered otherwise <code>false</code>
*/
public boolean isRegistered(Listener listener) {
return listeners.contains(listener);
}
public interface Listener {
void onRefreshSettings(Settings settings);
}

View File

@ -37,10 +37,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -85,6 +83,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
@ -121,10 +120,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class IndexShard extends AbstractIndexShardComponent {
public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener {
private final ThreadPool threadPool;
private final IndexSettingsService indexSettingsService;
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
private final IndexCache indexCache;
@ -144,8 +142,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexFieldDataService indexFieldDataService;
private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric();
private final ShardBitsetFilterCache shardBitsetFilterCache;
private final DiscoveryNode localNode;
private final Object mutex = new Object();
private final String checkIndexOnStartup;
private final CodecService codecService;
@ -171,9 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private RecoveryState recoveryState;
private final RecoveryStats recoveryStats = new RecoveryStats();
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
@ -200,13 +193,13 @@ public class IndexShard extends AbstractIndexShardComponent {
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store,
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
IndicesQueryCache indicesQueryCache, CodecService codecService,
TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory,
ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
super(shardId, indexSettingsService.getSettings());
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
super(shardId, indexSettings);
this.codecService = codecService;
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
@ -215,7 +208,6 @@ public class IndexShard extends AbstractIndexShardComponent {
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = factory;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
this.threadPool = threadPool;
@ -235,12 +227,9 @@ public class IndexShard extends AbstractIndexShardComponent {
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService);
this.indexFieldDataService = indexFieldDataService;
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState();
this.localNode = clusterService.localNode();
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
indexSettingsService.addListener(applyRefreshSettings);
this.path = path;
this.mergePolicyConfig = new MergePolicyConfig(logger, indexSettings);
/* create engine config */
@ -385,21 +374,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
/**
* Marks the shard as recovering based on a remote or local node, fails with exception is recovering is not allowed to be set.
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason, RecoveryState.Type type, DiscoveryNode sourceNode) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, sourceNode, localNode));
}
/**
* Marks the shard as recovering based on a restore, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason, RecoveryState.Type type, RestoreSource restoreSource) throws IndexShardStartedException {
return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, restoreSource, localNode));
}
private IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
public IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
@ -776,7 +753,6 @@ public class IndexShard extends AbstractIndexShardComponent {
public void close(String reason, boolean flushEngine) throws IOException {
synchronized (mutex) {
try {
indexSettingsService.removeListener(applyRefreshSettings);
if (state != IndexShardState.CLOSED) {
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
@ -1060,7 +1036,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return path;
}
public boolean recoverFromStore(ShardRouting shard) {
public boolean recoverFromStore(ShardRouting shard, DiscoveryNode localNode) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
@ -1069,10 +1045,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
}
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository) {
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository, DiscoveryNode locaNode) {
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
return storeRecovery.recoverFromRepository(this, repository, locaNode);
}
/**
@ -1094,110 +1070,108 @@ public class IndexShard extends AbstractIndexShardComponent {
return false;
}
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean change = false;
synchronized (mutex) {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations);
if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations);
IndexShard.this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize);
if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize);
IndexShard.this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush);
if (disableFlush != IndexShard.this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush);
IndexShard.this.disableFlush = disableFlush;
}
@Override
public void onRefreshSettings(Settings settings) {
boolean change = false;
synchronized (mutex) {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.flushThresholdOperations);
if (flushThresholdOperations != this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", this.flushThresholdOperations, flushThresholdOperations);
this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, this.disableFlush);
if (disableFlush != this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", this.disableFlush, disableFlush);
this.disableFlush = disableFlush;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose);
if (flushOnClose != IndexShard.this.flushOnClose) {
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose);
IndexShard.this.flushOnClose = flushOnClose;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);
if (flushOnClose != this.flushOnClose) {
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose);
this.flushOnClose = flushOnClose;
}
TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name()));
if (type != translogConfig.getType()) {
logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type);
translogConfig.setType(type);
}
TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name()));
if (type != translogConfig.getType()) {
logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type);
translogConfig.setType(type);
}
final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty());
if (durabilty != translogConfig.getDurabilty()) {
logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty);
translogConfig.setDurabilty(durabilty);
}
final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty());
if (durabilty != translogConfig.getDurabilty()) {
logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty);
translogConfig.setDurabilty(durabilty);
}
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, IndexShard.this.refreshInterval);
if (!refreshInterval.equals(IndexShard.this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", IndexShard.this.refreshInterval, refreshInterval);
if (refreshScheduledFuture != null) {
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
}
IndexShard.this.refreshInterval = refreshInterval;
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
}
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
if (!refreshInterval.equals(this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
if (refreshScheduledFuture != null) {
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
}
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.setGcDeletesInMillis(gcDeletesInMillis);
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.setCompoundOnFlush(compoundOnFlush);
change = true;
}
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
change = true;
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
change = true;
this.refreshInterval = refreshInterval;
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
}
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) {
engine().onSettingsChanged();
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.setGcDeletesInMillis(gcDeletesInMillis);
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.setCompoundOnFlush(compoundOnFlush);
change = true;
}
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
change = true;
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
change = true;
}
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) {
engine().onSettingsChanged();
}
}
@ -1428,7 +1402,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig);
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
@ -34,6 +35,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
@ -54,7 +56,7 @@ import java.io.IOException;
public final class ShadowIndexShard extends IndexShard {
@Inject
public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService,
public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings,
IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService,
IndexQueryParserService queryParserService, IndexCache indexCache,
@ -62,14 +64,14 @@ public final class ShadowIndexShard extends IndexShard {
CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@Nullable IndicesWarmer warmer,
SimilarityService similarityService,
EngineFactory factory, ClusterService clusterService,
EngineFactory factory,
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store,
super(shardId, indexSettings, indicesLifecycle, store,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, codecService,
termVectorsService, indexFieldDataService,
warmer, similarityService,
factory, clusterService, path, bigArrays, wrappingService);
factory, path, bigArrays, wrappingService);
}
/**

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.Directory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -72,7 +73,8 @@ final class StoreRecovery {
throw new IllegalStateException("can't recover - restore source is not null");
}
try {
indexShard.recovering("from store", RecoveryState.Type.STORE, localNode);
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode);
indexShard.recovering("from store", recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
@ -93,19 +95,21 @@ final class StoreRecovery {
* @return <code>true</code> if the the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() == null) {
final ShardRouting shardRouting = indexShard.routingEntry();
if (shardRouting.restoreSource() == null) {
throw new IllegalStateException("can't restore - restore source is null");
}
try {
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
final RecoveryState recoveryState = new RecoveryState(shardId, shardRouting.primary(), RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode);
indexShard.recovering("from snapshot", recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
logger.debug("restoring from {} ...", shardRouting.restoreSource());
restore(indexShard, repository);
});
}

View File

@ -677,14 +677,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
final boolean success;
final IndexShard shard = indexService.shard(shardId);
final DiscoveryNode localNode = clusterService.localNode();
if (restoreSource == null) {
// recover from filesystem store
success = shard.recoverFromStore(shardRouting);
success = shard.recoverFromStore(shardRouting, localNode);
} else {
// restore
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
try {
success = shard.restoreFromRepository(shardRouting, indexShardRepository);
success = shard.restoreFromRepository(shardRouting, indexShardRepository, localNode);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shard.shardId());

View File

@ -131,7 +131,8 @@ public class RecoveryTarget extends AbstractComponent {
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {
try {
indexShard.recovering("from " + sourceNode, recoveryType, sourceNode);
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), recoveryType, sourceNode, clusterService.localNode());
indexShard.recovering("from " + sourceNode, recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());

View File

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RestoreSource;
@ -115,7 +116,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository)}
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository, DiscoveryNode)} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -48,6 +49,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -787,7 +789,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue(newShard.recoverFromStore(routing));
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
@ -799,6 +802,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
@ -817,7 +821,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
try {
newShard.recoverFromStore(routing);
newShard.recoverFromStore(routing, localNode);
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
@ -826,11 +830,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so"));
ShardRoutingHelper.initialize(routing, origRouting.currentNodeId());
assertFalse("it's already recovering", newShard.recoverFromStore(routing));
assertFalse("it's already recovering", newShard.recoverFromStore(routing, localNode));
test.removeShard(0, "I broken it");
newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
@ -865,6 +869,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
Store targetStore = test_target_shard.store();
test_target_shard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
@ -893,7 +898,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
@Override
public void verify(String verificationToken) {
}
}));
}, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
@ -902,4 +907,15 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertSearchHits(client().prepareSearch("test_target").get(), "0");
}
public void testListenersAreRemoved() {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.shard(0);
IndexSettingsService settingsService = indexService.settingsService();
assertTrue(settingsService.isRegistered(shard));
indexService.removeShard(0, "simon says so");
assertFalse(settingsService.isRegistered(shard));
}
}