Merge pull request #13853 from s1monw/remove_more_deps

Remove ClusterSerivce and IndexSettingsService dependency from IndexShard
This commit is contained in:
Simon Willnauer 2015-09-30 14:20:28 +02:00
commit 77c2f151d2
9 changed files with 157 additions and 161 deletions

View File

@ -24,6 +24,7 @@ import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -382,6 +383,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap();
settingsService.addListener(indexShard);
success = true;
return indexShard;
} catch (IOException e) {
@ -433,6 +435,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
settingsService.removeListener(indexShard);
try {
final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted
indexShard.close(reason, flushEngine);
@ -453,18 +456,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
/**
* This method gets an instance for each of the given classes passed and calls #close() on the returned instance.
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
*/
private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable>... toClose) {
for (Class<? extends Closeable> closeable : toClose) {
if (closeInjectorOptionalResource(shardId, shardInjector, closeable) == false) {
logger.warn("[{}] no instance available for [{}], ignoring... ", shardId, closeable.getSimpleName());
}
}
}
/**
* Closes an optional resource. Returns true if the resource was found;
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log

View File

@ -73,6 +73,12 @@ public class IndexSettingsService extends AbstractIndexComponent {
this.listeners.remove(listener);
}
/**
* Returns <code>true</code> iff the given listener is already registered otherwise <code>false</code>
*/
public boolean isRegistered(Listener listener) {
return listeners.contains(listener);
}
public interface Listener {
void onRefreshSettings(Settings settings);
}

View File

@ -37,10 +37,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -85,6 +83,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
@ -121,10 +120,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class IndexShard extends AbstractIndexShardComponent {
public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener {
private final ThreadPool threadPool;
private final IndexSettingsService indexSettingsService;
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
private final IndexCache indexCache;
@ -144,8 +142,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexFieldDataService indexFieldDataService;
private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric();
private final ShardBitsetFilterCache shardBitsetFilterCache;
private final DiscoveryNode localNode;
private final Object mutex = new Object();
private final String checkIndexOnStartup;
private final CodecService codecService;
@ -171,9 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private RecoveryState recoveryState;
private final RecoveryStats recoveryStats = new RecoveryStats();
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
@ -200,13 +193,13 @@ public class IndexShard extends AbstractIndexShardComponent {
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store,
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
IndicesQueryCache indicesQueryCache, CodecService codecService,
TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory,
ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
super(shardId, indexSettingsService.getSettings());
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
super(shardId, indexSettings);
this.codecService = codecService;
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
@ -215,7 +208,6 @@ public class IndexShard extends AbstractIndexShardComponent {
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = factory;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
this.threadPool = threadPool;
@ -235,12 +227,9 @@ public class IndexShard extends AbstractIndexShardComponent {
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService);
this.indexFieldDataService = indexFieldDataService;
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState();
this.localNode = clusterService.localNode();
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
indexSettingsService.addListener(applyRefreshSettings);
this.path = path;
this.mergePolicyConfig = new MergePolicyConfig(logger, indexSettings);
/* create engine config */
@ -385,21 +374,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
/**
* Marks the shard as recovering based on a remote or local node, fails with exception is recovering is not allowed to be set.
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason, RecoveryState.Type type, DiscoveryNode sourceNode) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, sourceNode, localNode));
}
/**
* Marks the shard as recovering based on a restore, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason, RecoveryState.Type type, RestoreSource restoreSource) throws IndexShardStartedException {
return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, restoreSource, localNode));
}
private IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
public IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
@ -776,7 +753,6 @@ public class IndexShard extends AbstractIndexShardComponent {
public void close(String reason, boolean flushEngine) throws IOException {
synchronized (mutex) {
try {
indexSettingsService.removeListener(applyRefreshSettings);
if (state != IndexShardState.CLOSED) {
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
@ -1060,7 +1036,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return path;
}
public boolean recoverFromStore(ShardRouting shard) {
public boolean recoverFromStore(ShardRouting shard, DiscoveryNode localNode) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
@ -1069,10 +1045,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
}
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository) {
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository, DiscoveryNode locaNode) {
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
return storeRecovery.recoverFromRepository(this, repository, locaNode);
}
/**
@ -1094,110 +1070,108 @@ public class IndexShard extends AbstractIndexShardComponent {
return false;
}
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean change = false;
synchronized (mutex) {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, IndexShard.this.flushThresholdOperations);
if (flushThresholdOperations != IndexShard.this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", IndexShard.this.flushThresholdOperations, flushThresholdOperations);
IndexShard.this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, IndexShard.this.flushThresholdSize);
if (!flushThresholdSize.equals(IndexShard.this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", IndexShard.this.flushThresholdSize, flushThresholdSize);
IndexShard.this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, IndexShard.this.disableFlush);
if (disableFlush != IndexShard.this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", IndexShard.this.disableFlush, disableFlush);
IndexShard.this.disableFlush = disableFlush;
}
@Override
public void onRefreshSettings(Settings settings) {
boolean change = false;
synchronized (mutex) {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.flushThresholdOperations);
if (flushThresholdOperations != this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", this.flushThresholdOperations, flushThresholdOperations);
this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, this.disableFlush);
if (disableFlush != this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", this.disableFlush, disableFlush);
this.disableFlush = disableFlush;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose);
if (flushOnClose != IndexShard.this.flushOnClose) {
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose);
IndexShard.this.flushOnClose = flushOnClose;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);
if (flushOnClose != this.flushOnClose) {
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose);
this.flushOnClose = flushOnClose;
}
TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name()));
if (type != translogConfig.getType()) {
logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type);
translogConfig.setType(type);
}
TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name()));
if (type != translogConfig.getType()) {
logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type);
translogConfig.setType(type);
}
final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty());
if (durabilty != translogConfig.getDurabilty()) {
logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty);
translogConfig.setDurabilty(durabilty);
}
final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty());
if (durabilty != translogConfig.getDurabilty()) {
logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty);
translogConfig.setDurabilty(durabilty);
}
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, IndexShard.this.refreshInterval);
if (!refreshInterval.equals(IndexShard.this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", IndexShard.this.refreshInterval, refreshInterval);
if (refreshScheduledFuture != null) {
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
}
IndexShard.this.refreshInterval = refreshInterval;
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
}
TimeValue refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, this.refreshInterval);
if (!refreshInterval.equals(this.refreshInterval)) {
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
if (refreshScheduledFuture != null) {
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
}
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.setGcDeletesInMillis(gcDeletesInMillis);
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.setCompoundOnFlush(compoundOnFlush);
change = true;
}
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
change = true;
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
change = true;
this.refreshInterval = refreshInterval;
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
}
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) {
engine().onSettingsChanged();
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.setGcDeletesInMillis(gcDeletesInMillis);
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.setCompoundOnFlush(compoundOnFlush);
change = true;
}
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
change = true;
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
change = true;
}
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) {
engine().onSettingsChanged();
}
}
@ -1428,7 +1402,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig);
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
@ -34,6 +35,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
@ -54,7 +56,7 @@ import java.io.IOException;
public final class ShadowIndexShard extends IndexShard {
@Inject
public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService,
public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings,
IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService,
IndexQueryParserService queryParserService, IndexCache indexCache,
@ -62,14 +64,14 @@ public final class ShadowIndexShard extends IndexShard {
CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@Nullable IndicesWarmer warmer,
SimilarityService similarityService,
EngineFactory factory, ClusterService clusterService,
EngineFactory factory,
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store,
super(shardId, indexSettings, indicesLifecycle, store,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, codecService,
termVectorsService, indexFieldDataService,
warmer, similarityService,
factory, clusterService, path, bigArrays, wrappingService);
factory, path, bigArrays, wrappingService);
}
/**

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.Directory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -72,7 +73,8 @@ final class StoreRecovery {
throw new IllegalStateException("can't recover - restore source is not null");
}
try {
indexShard.recovering("from store", RecoveryState.Type.STORE, localNode);
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode);
indexShard.recovering("from store", recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
@ -93,19 +95,21 @@ final class StoreRecovery {
* @return <code>true</code> if the the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository, DiscoveryNode localNode) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() == null) {
final ShardRouting shardRouting = indexShard.routingEntry();
if (shardRouting.restoreSource() == null) {
throw new IllegalStateException("can't restore - restore source is null");
}
try {
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
final RecoveryState recoveryState = new RecoveryState(shardId, shardRouting.primary(), RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode);
indexShard.recovering("from snapshot", recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
logger.debug("restoring from {} ...", shardRouting.restoreSource());
restore(indexShard, repository);
});
}

View File

@ -677,14 +677,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
final boolean success;
final IndexShard shard = indexService.shard(shardId);
final DiscoveryNode localNode = clusterService.localNode();
if (restoreSource == null) {
// recover from filesystem store
success = shard.recoverFromStore(shardRouting);
success = shard.recoverFromStore(shardRouting, localNode);
} else {
// restore
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
try {
success = shard.restoreFromRepository(shardRouting, indexShardRepository);
success = shard.restoreFromRepository(shardRouting, indexShardRepository, localNode);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shard.shardId());

View File

@ -131,7 +131,8 @@ public class RecoveryTarget extends AbstractComponent {
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {
try {
indexShard.recovering("from " + sourceNode, recoveryType, sourceNode);
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), recoveryType, sourceNode, clusterService.localNode());
indexShard.recovering("from " + sourceNode, recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());

View File

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RestoreSource;
@ -115,7 +116,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository)}
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository, DiscoveryNode)} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -48,6 +49,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -787,7 +789,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue(newShard.recoverFromStore(routing));
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
@ -799,6 +802,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
@ -817,7 +821,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
try {
newShard.recoverFromStore(routing);
newShard.recoverFromStore(routing, localNode);
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
@ -826,11 +830,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so"));
ShardRoutingHelper.initialize(routing, origRouting.currentNodeId());
assertFalse("it's already recovering", newShard.recoverFromStore(routing));
assertFalse("it's already recovering", newShard.recoverFromStore(routing, localNode));
test.removeShard(0, "I broken it");
newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
@ -865,6 +869,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
Store targetStore = test_target_shard.store();
test_target_shard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
@ -893,7 +898,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
@Override
public void verify(String verificationToken) {
}
}));
}, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
@ -902,4 +907,15 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertSearchHits(client().prepareSearch("test_target").get(), "0");
}
public void testListenersAreRemoved() {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.shard(0);
IndexSettingsService settingsService = indexService.settingsService();
assertTrue(settingsService.isRegistered(shard));
indexService.removeShard(0, "simon says so");
assertFalse(settingsService.isRegistered(shard));
}
}