diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 8ebc671af6f..21d92df4bd7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -161,6 +161,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent { /** All known time settings for an index. */ public static final Set INDEX_TIME_SETTINGS = ImmutableSet.of( "index.gateway.wait_for_mapping_update_post_recovery", + "index.shard.wait_for_mapping_update_post_recovery", "index.gc_deletes", "index.indexing.slowlog.threshold.index.debug", "index.indexing.slowlog.threshold.index.info", diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 009b1a81571..e23a623add0 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -36,12 +36,10 @@ import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; -import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; import org.elasticsearch.index.fielddata.IndexFieldDataService; -import org.elasticsearch.index.gateway.IndexShardGatewayService; +import org.elasticsearch.index.shard.StoreRecoveryService; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; @@ -60,10 +58,8 @@ import org.elasticsearch.plugins.ShardsPluginsModule; import java.io.Closeable; import java.io.IOException; -import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -306,12 +302,16 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false || (primary && IndexMetaData.isOnSharedFilesystem(indexSettings)); - final ShardFilterCache shardFilterCache = new ShardFilterCache(shardId, injector.getInstance(IndicesFilterCache.class)); ModulesBuilder modules = new ModulesBuilder(); modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); - modules.add(new IndexShardModule(shardId, primary, indexSettings, shardFilterCache)); + modules.add(new IndexShardModule(shardId, primary, indexSettings)); modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock, - new StoreCloseListener(shardId, canDeleteShardContent, shardFilterCache), path)); + new StoreCloseListener(shardId, canDeleteShardContent, new Closeable() { + @Override + public void close() throws IOException { + injector.getInstance(IndicesFilterCache.class).onClose(shardId); + } + }), path)); modules.add(new DeletionPolicyModule(indexSettings)); try { shardInjector = modules.createChildInjector(injector); @@ -387,8 +387,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } closeInjectorResource(sId, shardInjector, - IndexShardGatewayService.class, - PercolatorQueriesRegistry.class); + StoreRecoveryService.class); // call this before we close the store, so we can release resources for it indicesLifecycle.afterIndexShardClosed(sId, indexShard, indexSettings); diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java index f5827dcf4cf..730539974ff 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java @@ -32,7 +32,6 @@ public class ShardBitsetFilterCache extends AbstractIndexShardComponent { private final CounterMetric totalMetric = new CounterMetric(); - @Inject public ShardBitsetFilterCache(ShardId shardId, @IndexSettings Settings indexSettings) { super(shardId, indexSettings); } diff --git a/core/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java deleted file mode 100644 index 550c25bdbf6..00000000000 --- a/core/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.cache.filter; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.cache.filter.IndicesFilterCache; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - */ -public class ShardFilterCache implements Closeable { - final IndicesFilterCache cache; - final ShardId shardId; - - public ShardFilterCache(ShardId shardId, IndicesFilterCache cache) { - this.cache = cache; - this.shardId = shardId; - } - - public FilterCacheStats stats() { - return cache.getStats(shardId); - } - - @Override - public void close() throws IOException { - cache.onClose(shardId); - } - -} diff --git a/core/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java b/core/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java index 808542fadc4..0cece67803c 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java @@ -39,7 +39,6 @@ public class ShardQueryCache extends AbstractIndexShardComponent implements Remo final CounterMetric hitCount = new CounterMetric(); final CounterMetric missCount = new CounterMetric(); - @Inject public ShardQueryCache(ShardId shardId, @IndexSettings Settings indexSettings) { super(shardId, indexSettings); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 5a6a752df44..01148339966 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -45,7 +45,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; -import org.elasticsearch.index.fielddata.ShardFieldData; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 51deb45907f..be8aa252cf1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; public final class EngineConfig { private final ShardId shardId; private final TranslogRecoveryPerformer translogRecoveryPerformer; + private final Settings indexSettings; private volatile ByteSizeValue indexingBufferSize; private volatile ByteSizeValue versionMapSize; private volatile String versionMapSizeSetting; @@ -64,7 +65,6 @@ public final class EngineConfig { private final boolean optimizeAutoGenerateId; private final ThreadPool threadPool; private final ShardIndexingService indexingService; - private final IndexSettingsService indexSettingsService; @Nullable private final IndicesWarmer warmer; private final Store store; @@ -141,14 +141,14 @@ public final class EngineConfig { * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, - IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, + Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy, TranslogConfig translogConfig) { this.shardId = shardId; + this.indexSettings = indexSettings; this.threadPool = threadPool; this.indexingService = indexingService; - this.indexSettingsService = indexSettingsService; this.warmer = warmer; this.store = store; this.deletionPolicy = deletionPolicy; @@ -158,7 +158,6 @@ public final class EngineConfig { this.similarity = similarity; this.codecService = codecService; this.failedEngineListener = failedEngineListener; - Settings indexSettings = indexSettingsService.getSettings(); this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false); this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65))); @@ -364,7 +363,7 @@ public final class EngineConfig { * Returns the latest index settings directly from the index settings service. */ public Settings getIndexSettings() { - return indexSettingsService.getSettings(); + return indexSettings; } /** @@ -429,9 +428,4 @@ public final class EngineConfig { public TranslogConfig getTranslogConfig() { return translogConfig; } - - IndexSettingsService getIndexSettingsService() { // for testing - return indexSettingsService; - } - } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java index f5edf6e21de..4b1f42060a6 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java @@ -36,18 +36,12 @@ import java.util.concurrent.ConcurrentMap; /** */ -public class ShardFieldData extends AbstractIndexShardComponent implements IndexFieldDataCache.Listener { +public class ShardFieldData implements IndexFieldDataCache.Listener { final CounterMetric evictionsMetric = new CounterMetric(); final CounterMetric totalMetric = new CounterMetric(); - final ConcurrentMap perFieldTotals = ConcurrentCollections.newConcurrentMap(); - @Inject - public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) { - super(shardId, indexSettings); - } - public FieldDataStats stats(String... fields) { ObjectLongHashMap fieldTotals = null; if (fields != null && fields.length > 0) { diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldDataModule.java b/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldDataModule.java deleted file mode 100644 index c64705666eb..00000000000 --- a/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldDataModule.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.fielddata; - -import org.elasticsearch.common.inject.AbstractModule; - -/** - */ -public class ShardFieldDataModule extends AbstractModule { - - @Override - protected void configure() { - bind(ShardFieldData.class).asEagerSingleton(); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IgnoreGatewayRecoveryException.java b/core/src/main/java/org/elasticsearch/index/gateway/IgnoreGatewayRecoveryException.java deleted file mode 100644 index 00fe6d5c5cb..00000000000 --- a/core/src/main/java/org/elasticsearch/index/gateway/IgnoreGatewayRecoveryException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.elasticsearch.index.shard.IndexShardException; -import org.elasticsearch.index.shard.ShardId; - -/** - * An exception marking that this recovery attempt should be ignored (since probably, we already recovered). - * - * - */ -public class IgnoreGatewayRecoveryException extends IndexShardException { - - public IgnoreGatewayRecoveryException(ShardId shardId, String msg) { - super(shardId, msg); - } - - public IgnoreGatewayRecoveryException(ShardId shardId, String msg, Throwable cause) { - super(shardId, msg, cause); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java deleted file mode 100644 index 2bd6a8c9bee..00000000000 --- a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.Directory; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.EngineException; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.RecoveryState; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * - */ -public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable { - - private final MappingUpdatedAction mappingUpdatedAction; - private final IndexService indexService; - private final IndexShard indexShard; - private final TimeValue waitForMappingUpdatePostRecovery; - - private final CancellableThreads cancellableThreads = new CancellableThreads(); - - - @Inject - public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, MappingUpdatedAction mappingUpdatedAction, - IndexService indexService, IndexShard indexShard) { - super(shardId, indexSettings); - this.mappingUpdatedAction = mappingUpdatedAction; - this.indexService = indexService; - this.indexShard = indexShard; - - this.waitForMappingUpdatePostRecovery = indexSettings.getAsTime("index.gateway.wait_for_mapping_update_post_recovery", TimeValue.timeValueSeconds(15)); - } - - /** - * Recovers the state of the shard from the gateway. - */ - public void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException { - indexShard.prepareForIndexRecovery(); - long version = -1; - final Map typesToUpdate; - SegmentInfos si = null; - final Store store = indexShard.store(); - store.incRef(); - try { - try { - store.failIfCorrupted(); - try { - si = store.readLastCommittedSegmentsInfo(); - } catch (Throwable e) { - String files = "_unknown_"; - try { - files = Arrays.toString(store.directory().listAll()); - } catch (Throwable e1) { - files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")"; - } - if (indexShouldExists) { - throw new IndexShardGatewayRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e); - } - } - if (si != null) { - if (indexShouldExists) { - version = si.getVersion(); - } else { - // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) - // its a "new index create" API, we have to do something, so better to clean it than use same data - logger.trace("cleaning existing shard, shouldn't exists"); - IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); - writer.close(); - recoveryState.getTranslog().totalOperations(0); - } - } - } catch (Throwable e) { - throw new IndexShardGatewayRecoveryException(shardId(), "failed to fetch index version after copying it over", e); - } - recoveryState.getIndex().updateVersion(version); - - // since we recover from local, just fill the files and size - try { - final RecoveryState.Index index = recoveryState.getIndex(); - if (si != null) { - final Directory directory = store.directory(); - for (String name : Lucene.files(si)) { - long length = directory.fileLength(name); - index.addFileDetail(name, length, true); - } - } - } catch (IOException e) { - logger.debug("failed to list file details", e); - } - if (indexShouldExists == false) { - recoveryState.getTranslog().totalOperations(0); - recoveryState.getTranslog().totalOperationsOnStart(0); - } - typesToUpdate = indexShard.performTranslogRecovery(); - - indexShard.finalizeRecovery(); - for (Map.Entry entry : typesToUpdate.entrySet()) { - validateMappingUpdate(entry.getKey(), entry.getValue()); - } - indexShard.postRecovery("post recovery from gateway"); - } catch (EngineException e) { - throw new IndexShardGatewayRecoveryException(shardId, "failed to recovery from gateway", e); - } finally { - store.decRef(); - } - } - - private void validateMappingUpdate(final String type, Mapping update) { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference error = new AtomicReference<>(); - mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() { - @Override - public void onMappingUpdate() { - latch.countDown(); - } - - @Override - public void onFailure(Throwable t) { - latch.countDown(); - error.set(t); - } - }); - cancellableThreads.execute(new CancellableThreads.Interruptable() { - @Override - public void run() throws InterruptedException { - try { - if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) { - logger.debug("waited for mapping update on master for [{}], yet timed out", type); - } else { - if (error.get() != null) { - throw new IndexShardGatewayRecoveryException(shardId, "Failed to propagate mappings on master post recovery", error.get()); - } - } - } catch (InterruptedException e) { - logger.debug("interrupted while waiting for mapping update"); - throw e; - } - } - }); - } - - @Override - public void close() { - cancellableThreads.cancel("closed"); - } - - @Override - public String toString() { - return "shard_gateway"; - } -} diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayException.java b/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayException.java deleted file mode 100644 index 5711451965d..00000000000 --- a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.elasticsearch.index.shard.IndexShardException; -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -public class IndexShardGatewayException extends IndexShardException { - - public IndexShardGatewayException(ShardId shardId, String msg) { - super(shardId, msg); - } - - public IndexShardGatewayException(ShardId shardId, String msg, Throwable cause) { - super(shardId, msg, cause); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayRecoveryException.java b/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayRecoveryException.java deleted file mode 100644 index 72c414c75ea..00000000000 --- a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayRecoveryException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -public class IndexShardGatewayRecoveryException extends IndexShardGatewayException { - - public IndexShardGatewayRecoveryException(ShardId shardId, String msg) { - super(shardId, msg); - } - - public IndexShardGatewayRecoveryException(ShardId shardId, String msg, Throwable cause) { - super(shardId, msg, cause); - } - -} diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java deleted file mode 100644 index 74c48f38b47..00000000000 --- a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.*; -import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.Closeable; -import java.io.IOException; - -import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; - -/** - * - */ -public class IndexShardGatewayService extends AbstractIndexShardComponent implements Closeable { - - private final ThreadPool threadPool; - - private final ClusterService clusterService; - - private final IndexShard indexShard; - - private final IndexShardGateway shardGateway; - - private final IndexShardSnapshotAndRestoreService snapshotService; - - @Inject - public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, - IndexShard indexShard, IndexShardGateway shardGateway, IndexShardSnapshotAndRestoreService snapshotService, ClusterService clusterService) { - super(shardId, indexSettings); - this.threadPool = threadPool; - this.indexShard = indexShard; - this.shardGateway = shardGateway; - this.snapshotService = snapshotService; - this.clusterService = clusterService; - } - - /** - * Should be called when the shard routing state has changed (note, after the state has been set on the shard). - */ - public void routingStateChanged() { - } - - public static interface RecoveryListener { - void onRecoveryDone(); - - void onIgnoreRecovery(String reason); - - void onRecoveryFailed(IndexShardGatewayRecoveryException e); - } - - /** - * Recovers the state of the shard from the gateway. - */ - public void recover(final boolean indexShouldExists, final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - listener.onIgnoreRecovery("shard closed"); - return; - } - if (!indexShard.routingEntry().primary()) { - listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Trying to recover when the shard is in backup state", null)); - return; - } - try { - if (indexShard.routingEntry().restoreSource() != null) { - indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource()); - } else { - indexShard.recovering("from gateway", RecoveryState.Type.GATEWAY, clusterService.localNode()); - } - } catch (IllegalIndexShardStateException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already recovering - listener.onIgnoreRecovery("already in recovering process, " + e.getMessage()); - return; - } - - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - - try { - final RecoveryState recoveryState = indexShard.recoveryState(); - if (indexShard.routingEntry().restoreSource() != null) { - logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource()); - snapshotService.restore(recoveryState); - } else { - logger.debug("starting recovery from {} ...", shardGateway); - shardGateway.recover(indexShouldExists, recoveryState); - } - - // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway - // to call post recovery. - IndexShardState shardState = indexShard.state(); - assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]"; - - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n"); - RecoveryState.Index index = recoveryState.getIndex(); - sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [") - .append(new ByteSizeValue(index.totalBytes())).append("], took[") - .append(TimeValue.timeValueMillis(index.time())).append("]\n"); - sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [") - .append(new ByteSizeValue(index.recoveredBytes())).append("]\n"); - sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [") - .append(new ByteSizeValue(index.reusedBytes())).append("]\n"); - sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [") - .append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n"); - sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) - .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]"); - logger.trace(sb.toString()); - } else if (logger.isDebugEnabled()) { - logger.debug("recovery completed from [{}], took [{}]", shardGateway, timeValueMillis(recoveryState.getTimer().time())); - } - listener.onRecoveryDone(); - } catch (IndexShardGatewayRecoveryException e) { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - listener.onIgnoreRecovery("shard closed"); - return; - } - if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { - // got closed on us, just ignore this recovery - listener.onIgnoreRecovery("shard closed"); - return; - } - listener.onRecoveryFailed(e); - } catch (IndexShardClosedException e) { - listener.onIgnoreRecovery("shard closed"); - } catch (IndexShardNotStartedException e) { - listener.onIgnoreRecovery("shard closed"); - } catch (Exception e) { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - listener.onIgnoreRecovery("shard closed"); - return; - } - listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e)); - } - } - }); - } - - @Override - public synchronized void close() throws IOException { - shardGateway.close(); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewaySnapshotNotAllowedException.java b/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewaySnapshotNotAllowedException.java deleted file mode 100644 index 64a147b2195..00000000000 --- a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewaySnapshotNotAllowedException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -public class IndexShardGatewaySnapshotNotAllowedException extends IndexShardGatewayException { - - public IndexShardGatewaySnapshotNotAllowedException(ShardId shardId, String msg) { - super(shardId, msg); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/indexing/slowlog/ShardSlowLogIndexingService.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java similarity index 56% rename from core/src/main/java/org/elasticsearch/index/indexing/slowlog/ShardSlowLogIndexingService.java rename to core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java index 2cc3b3357a0..2729e163d15 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/slowlog/ShardSlowLogIndexingService.java +++ b/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java @@ -17,9 +17,8 @@ * under the License. */ -package org.elasticsearch.index.indexing.slowlog; +package org.elasticsearch.index.indexing; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -27,10 +26,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.Locale; @@ -38,7 +34,7 @@ import java.util.concurrent.TimeUnit; /** */ -public class ShardSlowLogIndexingService extends AbstractIndexShardComponent { +public final class IndexingSlowLog { private boolean reformat; @@ -52,53 +48,16 @@ public class ShardSlowLogIndexingService extends AbstractIndexShardComponent { private final ESLogger indexLogger; private final ESLogger deleteLogger; - public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN = "index.indexing.slowlog.threshold.index.warn"; - public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO = "index.indexing.slowlog.threshold.index.info"; - public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG = "index.indexing.slowlog.threshold.index.debug"; - public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE = "index.indexing.slowlog.threshold.index.trace"; - public static final String INDEX_INDEXING_SLOWLOG_REFORMAT = "index.indexing.slowlog.reformat"; - public static final String INDEX_INDEXING_SLOWLOG_LEVEL = "index.indexing.slowlog.level"; - - class ApplySettings implements IndexSettingsService.Listener { - @Override - public synchronized void onRefreshSettings(Settings settings) { - long indexWarnThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexWarnThreshold)).nanos(); - if (indexWarnThreshold != ShardSlowLogIndexingService.this.indexWarnThreshold) { - ShardSlowLogIndexingService.this.indexWarnThreshold = indexWarnThreshold; - } - long indexInfoThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexInfoThreshold)).nanos(); - if (indexInfoThreshold != ShardSlowLogIndexingService.this.indexInfoThreshold) { - ShardSlowLogIndexingService.this.indexInfoThreshold = indexInfoThreshold; - } - long indexDebugThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexDebugThreshold)).nanos(); - if (indexDebugThreshold != ShardSlowLogIndexingService.this.indexDebugThreshold) { - ShardSlowLogIndexingService.this.indexDebugThreshold = indexDebugThreshold; - } - long indexTraceThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE, TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexTraceThreshold)).nanos(); - if (indexTraceThreshold != ShardSlowLogIndexingService.this.indexTraceThreshold) { - ShardSlowLogIndexingService.this.indexTraceThreshold = indexTraceThreshold; - } - - String level = settings.get(INDEX_INDEXING_SLOWLOG_LEVEL, ShardSlowLogIndexingService.this.level); - if (!level.equals(ShardSlowLogIndexingService.this.level)) { - ShardSlowLogIndexingService.this.indexLogger.setLevel(level.toUpperCase(Locale.ROOT)); - ShardSlowLogIndexingService.this.deleteLogger.setLevel(level.toUpperCase(Locale.ROOT)); - ShardSlowLogIndexingService.this.level = level; - } - - boolean reformat = settings.getAsBoolean(INDEX_INDEXING_SLOWLOG_REFORMAT, ShardSlowLogIndexingService.this.reformat); - if (reformat != ShardSlowLogIndexingService.this.reformat) { - ShardSlowLogIndexingService.this.reformat = reformat; - } - } - } - - @Inject - public ShardSlowLogIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService) { - super(shardId, indexSettings); + private static final String INDEX_INDEXING_SLOWLOG_PREFIX = "index.indexing.slowlog"; + public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN = INDEX_INDEXING_SLOWLOG_PREFIX +".threshold.index.warn"; + public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO = INDEX_INDEXING_SLOWLOG_PREFIX +".threshold.index.info"; + public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG = INDEX_INDEXING_SLOWLOG_PREFIX +".threshold.index.debug"; + public static final String INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE = INDEX_INDEXING_SLOWLOG_PREFIX +".threshold.index.trace"; + public static final String INDEX_INDEXING_SLOWLOG_REFORMAT = INDEX_INDEXING_SLOWLOG_PREFIX +".reformat"; + public static final String INDEX_INDEXING_SLOWLOG_LEVEL = INDEX_INDEXING_SLOWLOG_PREFIX +".level"; + IndexingSlowLog(Settings indexSettings) { this.reformat = indexSettings.getAsBoolean(INDEX_INDEXING_SLOWLOG_REFORMAT, true); - this.indexWarnThreshold = indexSettings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, TimeValue.timeValueNanos(-1)).nanos(); this.indexInfoThreshold = indexSettings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, TimeValue.timeValueNanos(-1)).nanos(); this.indexDebugThreshold = indexSettings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, TimeValue.timeValueNanos(-1)).nanos(); @@ -106,20 +65,49 @@ public class ShardSlowLogIndexingService extends AbstractIndexShardComponent { this.level = indexSettings.get(INDEX_INDEXING_SLOWLOG_LEVEL, "TRACE").toUpperCase(Locale.ROOT); - this.indexLogger = Loggers.getLogger(logger, ".index"); - this.deleteLogger = Loggers.getLogger(logger, ".delete"); + this.indexLogger = Loggers.getLogger(INDEX_INDEXING_SLOWLOG_PREFIX +".index"); + this.deleteLogger = Loggers.getLogger(INDEX_INDEXING_SLOWLOG_PREFIX +".delete"); indexLogger.setLevel(level); deleteLogger.setLevel(level); - - indexSettingsService.addListener(new ApplySettings()); } - public void postIndex(Engine.Index index, long tookInNanos) { + synchronized void onRefreshSettings(Settings settings) { + long indexWarnThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, TimeValue.timeValueNanos(this.indexWarnThreshold)).nanos(); + if (indexWarnThreshold != this.indexWarnThreshold) { + this.indexWarnThreshold = indexWarnThreshold; + } + long indexInfoThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, TimeValue.timeValueNanos(this.indexInfoThreshold)).nanos(); + if (indexInfoThreshold != this.indexInfoThreshold) { + this.indexInfoThreshold = indexInfoThreshold; + } + long indexDebugThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, TimeValue.timeValueNanos(this.indexDebugThreshold)).nanos(); + if (indexDebugThreshold != this.indexDebugThreshold) { + this.indexDebugThreshold = indexDebugThreshold; + } + long indexTraceThreshold = settings.getAsTime(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE, TimeValue.timeValueNanos(this.indexTraceThreshold)).nanos(); + if (indexTraceThreshold != this.indexTraceThreshold) { + this.indexTraceThreshold = indexTraceThreshold; + } + + String level = settings.get(INDEX_INDEXING_SLOWLOG_LEVEL, this.level); + if (!level.equals(this.level)) { + this.indexLogger.setLevel(level.toUpperCase(Locale.ROOT)); + this.deleteLogger.setLevel(level.toUpperCase(Locale.ROOT)); + this.level = level; + } + + boolean reformat = settings.getAsBoolean(INDEX_INDEXING_SLOWLOG_REFORMAT, this.reformat); + if (reformat != this.reformat) { + this.reformat = reformat; + } + } + + void postIndex(Engine.Index index, long tookInNanos) { postIndexing(index.parsedDoc(), tookInNanos); } - public void postCreate(Engine.Create create, long tookInNanos) { + void postCreate(Engine.Create create, long tookInNanos) { postIndexing(create.parsedDoc(), tookInNanos); } @@ -135,12 +123,12 @@ public class ShardSlowLogIndexingService extends AbstractIndexShardComponent { } } - public static class SlowLogParsedDocumentPrinter { + final static class SlowLogParsedDocumentPrinter { private final ParsedDocument doc; private final long tookInNanos; private final boolean reformat; - public SlowLogParsedDocumentPrinter(ParsedDocument doc, long tookInNanos, boolean reformat) { + SlowLogParsedDocumentPrinter(ParsedDocument doc, long tookInNanos, boolean reformat) { this.doc = doc; this.tookInNanos = tookInNanos; this.reformat = reformat; diff --git a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java index b66ce79a8e7..2109eafaaed 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java +++ b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java @@ -21,15 +21,12 @@ package org.elasticsearch.index.indexing; import com.google.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; -import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -42,7 +39,7 @@ import java.util.concurrent.TimeUnit; */ public class ShardIndexingService extends AbstractIndexShardComponent { - private final ShardSlowLogIndexingService slowLog; + private final IndexingSlowLog slowLog; private final StatsHolder totalStats = new StatsHolder(); @@ -50,10 +47,9 @@ public class ShardIndexingService extends AbstractIndexShardComponent { private volatile Map typesStats = ImmutableMap.of(); - @Inject - public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, ShardSlowLogIndexingService slowLog) { + public ShardIndexingService(ShardId shardId, Settings indexSettings) { super(shardId, indexSettings); - this.slowLog = slowLog; + this.slowLog = new IndexingSlowLog(indexSettings); } /** @@ -252,6 +248,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return stats; } + public void onRefreshSettings(Settings settings) { + slowLog.onRefreshSettings(settings); + } + static class StatsHolder { public final MeanMetric indexMetric = new MeanMetric(); public final MeanMetric deleteMetric = new MeanMetric(); diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java index 22ca67b9c62..43193b47aea 100644 --- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java +++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java @@ -26,14 +26,12 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.indexing.IndexingOperationListener; @@ -94,7 +92,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple } }; - @Inject public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService, ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService, IndexFieldDataService indexFieldDataService, ShardPercolateService shardPercolateService) { diff --git a/core/src/main/java/org/elasticsearch/index/search/slowlog/ShardSlowLogSearchService.java b/core/src/main/java/org/elasticsearch/index/search/stats/SearchSlowLog.java similarity index 57% rename from core/src/main/java/org/elasticsearch/index/search/slowlog/ShardSlowLogSearchService.java rename to core/src/main/java/org/elasticsearch/index/search/stats/SearchSlowLog.java index ceb1df5d68e..cfb74027a79 100644 --- a/core/src/main/java/org/elasticsearch/index/search/slowlog/ShardSlowLogSearchService.java +++ b/core/src/main/java/org/elasticsearch/index/search/stats/SearchSlowLog.java @@ -17,19 +17,14 @@ * under the License. */ -package org.elasticsearch.index.search.slowlog; +package org.elasticsearch.index.search.stats; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -38,7 +33,7 @@ import java.util.concurrent.TimeUnit; /** */ -public class ShardSlowLogSearchService extends AbstractIndexShardComponent { +public final class SearchSlowLog{ private boolean reformat; @@ -57,71 +52,19 @@ public class ShardSlowLogSearchService extends AbstractIndexShardComponent { private final ESLogger queryLogger; private final ESLogger fetchLogger; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN = "index.search.slowlog.threshold.query.warn"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO = "index.search.slowlog.threshold.query.info"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG = "index.search.slowlog.threshold.query.debug"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE = "index.search.slowlog.threshold.query.trace"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN = "index.search.slowlog.threshold.fetch.warn"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO = "index.search.slowlog.threshold.fetch.info"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG = "index.search.slowlog.threshold.fetch.debug"; - public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE = "index.search.slowlog.threshold.fetch.trace"; - public static final String INDEX_SEARCH_SLOWLOG_REFORMAT = "index.search.slowlog.reformat"; - public static final String INDEX_SEARCH_SLOWLOG_LEVEL = "index.search.slowlog.level"; + private static final String INDEX_SEARCH_SLOWLOG_PREFIX = "index.search.slowlog"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.query.warn"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.query.info"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.query.debug"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.query.trace"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.fetch.warn"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.fetch.info"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.fetch.debug"; + public static final String INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE = INDEX_SEARCH_SLOWLOG_PREFIX + ".threshold.fetch.trace"; + public static final String INDEX_SEARCH_SLOWLOG_REFORMAT = INDEX_SEARCH_SLOWLOG_PREFIX + ".reformat"; + public static final String INDEX_SEARCH_SLOWLOG_LEVEL = INDEX_SEARCH_SLOWLOG_PREFIX + ".level"; - class ApplySettings implements IndexSettingsService.Listener { - @Override - public synchronized void onRefreshSettings(Settings settings) { - long queryWarnThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.queryWarnThreshold)).nanos(); - if (queryWarnThreshold != ShardSlowLogSearchService.this.queryWarnThreshold) { - ShardSlowLogSearchService.this.queryWarnThreshold = queryWarnThreshold; - } - long queryInfoThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.queryInfoThreshold)).nanos(); - if (queryInfoThreshold != ShardSlowLogSearchService.this.queryInfoThreshold) { - ShardSlowLogSearchService.this.queryInfoThreshold = queryInfoThreshold; - } - long queryDebugThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.queryDebugThreshold)).nanos(); - if (queryDebugThreshold != ShardSlowLogSearchService.this.queryDebugThreshold) { - ShardSlowLogSearchService.this.queryDebugThreshold = queryDebugThreshold; - } - long queryTraceThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.queryTraceThreshold)).nanos(); - if (queryTraceThreshold != ShardSlowLogSearchService.this.queryTraceThreshold) { - ShardSlowLogSearchService.this.queryTraceThreshold = queryTraceThreshold; - } - - long fetchWarnThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.fetchWarnThreshold)).nanos(); - if (fetchWarnThreshold != ShardSlowLogSearchService.this.fetchWarnThreshold) { - ShardSlowLogSearchService.this.fetchWarnThreshold = fetchWarnThreshold; - } - long fetchInfoThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.fetchInfoThreshold)).nanos(); - if (fetchInfoThreshold != ShardSlowLogSearchService.this.fetchInfoThreshold) { - ShardSlowLogSearchService.this.fetchInfoThreshold = fetchInfoThreshold; - } - long fetchDebugThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.fetchDebugThreshold)).nanos(); - if (fetchDebugThreshold != ShardSlowLogSearchService.this.fetchDebugThreshold) { - ShardSlowLogSearchService.this.fetchDebugThreshold = fetchDebugThreshold; - } - long fetchTraceThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE, TimeValue.timeValueNanos(ShardSlowLogSearchService.this.fetchTraceThreshold)).nanos(); - if (fetchTraceThreshold != ShardSlowLogSearchService.this.fetchTraceThreshold) { - ShardSlowLogSearchService.this.fetchTraceThreshold = fetchTraceThreshold; - } - - String level = settings.get(INDEX_SEARCH_SLOWLOG_LEVEL, ShardSlowLogSearchService.this.level); - if (!level.equals(ShardSlowLogSearchService.this.level)) { - ShardSlowLogSearchService.this.queryLogger.setLevel(level.toUpperCase(Locale.ROOT)); - ShardSlowLogSearchService.this.fetchLogger.setLevel(level.toUpperCase(Locale.ROOT)); - ShardSlowLogSearchService.this.level = level; - } - - boolean reformat = settings.getAsBoolean(INDEX_SEARCH_SLOWLOG_REFORMAT, ShardSlowLogSearchService.this.reformat); - if (reformat != ShardSlowLogSearchService.this.reformat) { - ShardSlowLogSearchService.this.reformat = reformat; - } - } - } - - @Inject - public ShardSlowLogSearchService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService) { - super(shardId, indexSettings); + SearchSlowLog(Settings indexSettings) { this.reformat = indexSettings.getAsBoolean(INDEX_SEARCH_SLOWLOG_REFORMAT, true); @@ -137,16 +80,14 @@ public class ShardSlowLogSearchService extends AbstractIndexShardComponent { this.level = indexSettings.get(INDEX_SEARCH_SLOWLOG_LEVEL, "TRACE").toUpperCase(Locale.ROOT); - this.queryLogger = Loggers.getLogger(logger, ".query"); - this.fetchLogger = Loggers.getLogger(logger, ".fetch"); + this.queryLogger = Loggers.getLogger(INDEX_SEARCH_SLOWLOG_PREFIX + ".query"); + this.fetchLogger = Loggers.getLogger(INDEX_SEARCH_SLOWLOG_PREFIX + ".fetch"); queryLogger.setLevel(level); fetchLogger.setLevel(level); - - indexSettingsService.addListener(new ApplySettings()); } - public void onQueryPhase(SearchContext context, long tookInNanos) { + void onQueryPhase(SearchContext context, long tookInNanos) { if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) { queryLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat)); } else if (queryInfoThreshold >= 0 && tookInNanos > queryInfoThreshold) { @@ -158,7 +99,7 @@ public class ShardSlowLogSearchService extends AbstractIndexShardComponent { } } - public void onFetchPhase(SearchContext context, long tookInNanos) { + void onFetchPhase(SearchContext context, long tookInNanos) { if (fetchWarnThreshold >= 0 && tookInNanos > fetchWarnThreshold) { fetchLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat)); } else if (fetchInfoThreshold >= 0 && tookInNanos > fetchInfoThreshold) { @@ -170,7 +111,55 @@ public class ShardSlowLogSearchService extends AbstractIndexShardComponent { } } - public static class SlowLogSearchContextPrinter { + synchronized void onRefreshSettings(Settings settings) { + long queryWarnThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN, TimeValue.timeValueNanos(this.queryWarnThreshold)).nanos(); + if (queryWarnThreshold != this.queryWarnThreshold) { + this.queryWarnThreshold = queryWarnThreshold; + } + long queryInfoThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO, TimeValue.timeValueNanos(this.queryInfoThreshold)).nanos(); + if (queryInfoThreshold != this.queryInfoThreshold) { + this.queryInfoThreshold = queryInfoThreshold; + } + long queryDebugThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG, TimeValue.timeValueNanos(this.queryDebugThreshold)).nanos(); + if (queryDebugThreshold != this.queryDebugThreshold) { + this.queryDebugThreshold = queryDebugThreshold; + } + long queryTraceThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE, TimeValue.timeValueNanos(this.queryTraceThreshold)).nanos(); + if (queryTraceThreshold != this.queryTraceThreshold) { + this.queryTraceThreshold = queryTraceThreshold; + } + + long fetchWarnThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN, TimeValue.timeValueNanos(this.fetchWarnThreshold)).nanos(); + if (fetchWarnThreshold != this.fetchWarnThreshold) { + this.fetchWarnThreshold = fetchWarnThreshold; + } + long fetchInfoThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO, TimeValue.timeValueNanos(this.fetchInfoThreshold)).nanos(); + if (fetchInfoThreshold != this.fetchInfoThreshold) { + this.fetchInfoThreshold = fetchInfoThreshold; + } + long fetchDebugThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG, TimeValue.timeValueNanos(this.fetchDebugThreshold)).nanos(); + if (fetchDebugThreshold != this.fetchDebugThreshold) { + this.fetchDebugThreshold = fetchDebugThreshold; + } + long fetchTraceThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE, TimeValue.timeValueNanos(this.fetchTraceThreshold)).nanos(); + if (fetchTraceThreshold != this.fetchTraceThreshold) { + this.fetchTraceThreshold = fetchTraceThreshold; + } + + String level = settings.get(INDEX_SEARCH_SLOWLOG_LEVEL, this.level); + if (!level.equals(this.level)) { + this.queryLogger.setLevel(level.toUpperCase(Locale.ROOT)); + this.fetchLogger.setLevel(level.toUpperCase(Locale.ROOT)); + this.level = level; + } + + boolean reformat = settings.getAsBoolean(INDEX_SEARCH_SLOWLOG_REFORMAT, this.reformat); + if (reformat != this.reformat) { + this.reformat = reformat; + } + } + + private static class SlowLogSearchContextPrinter { private final SearchContext context; private final long tookInNanos; private final boolean reformat; diff --git a/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java b/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java similarity index 91% rename from core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java rename to core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java index f8c00a03cba..ade73676709 100644 --- a/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java +++ b/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java @@ -21,15 +21,10 @@ package org.elasticsearch.index.search.stats; import com.google.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.internal.SearchContext; import java.util.HashMap; @@ -38,19 +33,15 @@ import java.util.concurrent.TimeUnit; /** */ -public class ShardSearchService extends AbstractIndexShardComponent { - - private final ShardSlowLogSearchService slowLogSearchService; +public final class ShardSearchStats { + private final SearchSlowLog slowLogSearchService; private final StatsHolder totalStats = new StatsHolder(); private final CounterMetric openContexts = new CounterMetric(); - private volatile Map groupsStats = ImmutableMap.of(); - @Inject - public ShardSearchService(ShardId shardId, @IndexSettings Settings indexSettings, ShardSlowLogSearchService slowLogSearchService) { - super(shardId, indexSettings); - this.slowLogSearchService = slowLogSearchService; + public ShardSearchStats(Settings indexSettings) { + this.slowLogSearchService = new SearchSlowLog(indexSettings); } /** @@ -178,7 +169,11 @@ public class ShardSearchService extends AbstractIndexShardComponent { openContexts.dec(); } - static class StatsHolder { + public void onRefreshSettings(Settings settings) { + slowLogSearchService.onRefreshSettings(settings); + } + + final static class StatsHolder { public final MeanMetric queryMetric = new MeanMetric(); public final MeanMetric fetchMetric = new MeanMetric(); public final CounterMetric queryCurrent = new CounterMetric(); diff --git a/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index d6f52967523..650b1a79dd5 100644 --- a/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -31,8 +31,8 @@ import org.elasticsearch.cluster.settings.Validator; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; -import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService; +import org.elasticsearch.index.indexing.IndexingSlowLog; +import org.elasticsearch.index.search.stats.SearchSlowLog; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.index.store.IndexStore; @@ -78,22 +78,22 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE); - indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_REFORMAT); - indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_LEVEL); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE, Validator.TIME); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_REFORMAT); - indexDynamicSettings.addDynamicSetting(ShardSlowLogSearchService.INDEX_SEARCH_SLOWLOG_LEVEL); + indexDynamicSettings.addDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME); + indexDynamicSettings.addDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME); + indexDynamicSettings.addDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME); + indexDynamicSettings.addDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE, Validator.TIME); + indexDynamicSettings.addDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT); + indexDynamicSettings.addDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE, Validator.TIME); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_REFORMAT); + indexDynamicSettings.addDynamicSetting(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL); indexDynamicSettings.addDynamicSetting(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE, Validator.INTEGER); indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, Validator.DOUBLE); indexDynamicSettings.addDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT, Validator.BYTES_SIZE); diff --git a/core/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java b/core/src/main/java/org/elasticsearch/index/shard/CommitPoint.java similarity index 99% rename from core/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java rename to core/src/main/java/org/elasticsearch/index/shard/CommitPoint.java index c7203850ba1..bae4ae8baf5 100644 --- a/core/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java +++ b/core/src/main/java/org/elasticsearch/index/shard/CommitPoint.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway; +package org.elasticsearch.index.shard; import com.google.common.collect.ImmutableList; import org.elasticsearch.common.Nullable; diff --git a/core/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java b/core/src/main/java/org/elasticsearch/index/shard/CommitPoints.java similarity index 99% rename from core/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java rename to core/src/main/java/org/elasticsearch/index/shard/CommitPoints.java index 5ad4babd2dc..a56a0f62c24 100644 --- a/core/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java +++ b/core/src/main/java/org/elasticsearch/index/shard/CommitPoints.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway; +package org.elasticsearch.index.shard; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index af121a6f707..d3ee32606c4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -51,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.VersionType; @@ -59,7 +59,6 @@ import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; import org.elasticsearch.index.cache.filter.FilterCacheStats; -import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.query.ShardQueryCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; @@ -81,7 +80,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; -import org.elasticsearch.index.search.stats.ShardSearchService; +import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; @@ -100,6 +99,7 @@ import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.InternalIndicesLifecycle; +import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat; import org.elasticsearch.search.suggest.completion.CompletionStats; @@ -131,10 +131,9 @@ public class IndexShard extends AbstractIndexShardComponent { private final MergeSchedulerConfig mergeSchedulerConfig; private final IndexAliasesService indexAliasesService; private final ShardIndexingService indexingService; - private final ShardSearchService searchService; + private final ShardSearchStats searchService; private final ShardGetService getService; private final ShardIndexWarmerService shardWarmerService; - private final ShardFilterCache shardFilterCache; private final ShardQueryCache shardQueryCache; private final ShardFieldData shardFieldData; private final PercolatorQueriesRegistry percolatorQueriesRegistry; @@ -148,7 +147,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final Object mutex = new Object(); private final String checkIndexOnStartup; - private final NodeEnvironment nodeEnv; private final CodecService codecService; private final IndicesWarmer warmer; private final SnapshotDeletionPolicy deletionPolicy; @@ -156,6 +154,8 @@ public class IndexShard extends AbstractIndexShardComponent { private final EngineConfig engineConfig; private final TranslogConfig translogConfig; private final MergePolicyConfig mergePolicyConfig; + private final IndicesFilterCache indicesFilterCache; + private final StoreRecoveryService storeRecoveryService; private TimeValue refreshInterval; @@ -191,13 +191,12 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexShardOperationCounter indexShardOperationCounter; @Inject - public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, - ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, - ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService, + public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService, + ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, + IndicesFilterCache indicesFilterCache, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, - ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, - ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path, BigArrays bigArrays) { + ClusterService clusterService, ShardPath path, BigArrays bigArrays) { super(shardId, indexSettingsService.getSettings()); this.codecService = codecService; this.warmer = warmer; @@ -209,31 +208,31 @@ public class IndexShard extends AbstractIndexShardComponent { this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indexSettingsService = indexSettingsService; this.store = store; + this.storeRecoveryService = storeRecoveryService; this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings); this.threadPool = threadPool; this.mapperService = mapperService; this.queryParserService = queryParserService; this.indexCache = indexCache; this.indexAliasesService = indexAliasesService; - this.indexingService = indexingService; + this.indexingService = new ShardIndexingService(shardId, indexSettings); this.getService = new ShardGetService(this, mapperService); this.termVectorsService = termVectorsService.setIndexShard(this); - this.searchService = searchService; - this.shardWarmerService = shardWarmerService; - this.shardFilterCache = shardFilterCache; - this.shardQueryCache = shardQueryCache; - this.shardFieldData = shardFieldData; - this.percolatorQueriesRegistry = percolatorQueriesRegistry; + this.searchService = new ShardSearchStats(indexSettings); + this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings); + this.indicesFilterCache = indicesFilterCache; + this.shardQueryCache = new ShardQueryCache(shardId, indexSettings); + this.shardFieldData = new ShardFieldData(); + this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService); this.shardPercolateService = shardPercolateService; this.indexFieldDataService = indexFieldDataService; this.indexService = indexService; - this.shardBitsetFilterCache = shardBitsetFilterCache; + this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState(); this.localNode = clusterService.localNode(); state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true); - this.nodeEnv = nodeEnv; indexSettingsService.addListener(applyRefreshSettings); this.mapperAnalyzer = new MapperAnalyzer(mapperService); this.path = path; @@ -292,7 +291,7 @@ public class IndexShard extends AbstractIndexShardComponent { return indexService; } - public ShardSearchService searchService() { + public ShardSearchStats searchService() { return this.searchService; } @@ -300,10 +299,6 @@ public class IndexShard extends AbstractIndexShardComponent { return this.shardWarmerService; } - public ShardFilterCache filterCache() { - return this.shardFilterCache; - } - public ShardQueryCache queryCache() { return this.shardQueryCache; } @@ -625,7 +620,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public FilterCacheStats filterCacheStats() { - return shardFilterCache.stats(); + return indicesFilterCache.getStats(shardId); } public FieldDataStats fieldDataStats(String... fields) { @@ -765,7 +760,7 @@ public class IndexShard extends AbstractIndexShardComponent { engine.flushAndClose(); } } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times - IOUtils.close(engine); + IOUtils.close(engine, percolatorQueriesRegistry); } } } @@ -1023,6 +1018,13 @@ public class IndexShard extends AbstractIndexShardComponent { return path; } + public void recoverFromStore(IndexShardRoutingTable shardRoutingTable, StoreRecoveryService.RecoveryListener recoveryListener) { + // we are the first primary, recover from the gateway + // if its post api allocation, the index should exists + final boolean shouldExist = shardRoutingTable.primaryAllocatedPostApi(); + storeRecoveryService.recover(this, shouldExist, recoveryListener); + } + private class ApplyRefreshSettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { @@ -1106,6 +1108,8 @@ public class IndexShard extends AbstractIndexShardComponent { } } mergePolicyConfig.onRefreshSettings(settings); + searchService.onRefreshSettings(settings); + indexingService.onRefreshSettings(settings); if (change) { refresh("apply settings"); } @@ -1345,7 +1349,7 @@ public class IndexShard extends AbstractIndexShardComponent { } }; return new EngineConfig(shardId, - threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, + threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy(), translogConfig); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index 7d7074d001c..c445ce442da 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -22,24 +22,11 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; -import org.elasticsearch.index.cache.filter.ShardFilterCache; -import org.elasticsearch.index.cache.query.ShardQueryCache; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; -import org.elasticsearch.index.fielddata.ShardFieldData; -import org.elasticsearch.index.gateway.IndexShardGateway; -import org.elasticsearch.index.gateway.IndexShardGatewayService; -import org.elasticsearch.index.indexing.ShardIndexingService; -import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; -import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.percolator.stats.ShardPercolateService; -import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService; -import org.elasticsearch.index.search.stats.ShardSearchService; -import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService; import org.elasticsearch.index.termvectors.ShardTermVectorsService; import org.elasticsearch.index.translog.TranslogService; -import org.elasticsearch.index.warmer.ShardIndexWarmerService; /** * The {@code IndexShardModule} module is responsible for binding the correct @@ -57,11 +44,9 @@ public class IndexShardModule extends AbstractModule { private final ShardId shardId; private final Settings settings; private final boolean primary; - private final ShardFilterCache shardFilterCache; - public IndexShardModule(ShardId shardId, boolean primary, Settings settings, ShardFilterCache shardFilterCache) { + public IndexShardModule(ShardId shardId, boolean primary, Settings settings) { this.settings = settings; - this.shardFilterCache = shardFilterCache; this.shardId = shardId; this.primary = primary; if (settings.get("index.translog.type") != null) { @@ -85,21 +70,9 @@ public class IndexShardModule extends AbstractModule { } bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS, ENGINE_PREFIX, ENGINE_SUFFIX)); - bind(ShardIndexWarmerService.class).asEagerSingleton(); - bind(ShardIndexingService.class).asEagerSingleton(); - bind(ShardSlowLogIndexingService.class).asEagerSingleton(); - bind(ShardSearchService.class).asEagerSingleton(); - bind(ShardSlowLogSearchService.class).asEagerSingleton(); - bind(ShardFilterCache.class).toInstance(shardFilterCache); - bind(ShardQueryCache.class).asEagerSingleton(); - bind(ShardBitsetFilterCache.class).asEagerSingleton(); - bind(ShardFieldData.class).asEagerSingleton(); - bind(IndexShardGateway.class).asEagerSingleton(); - bind(IndexShardGatewayService.class).asEagerSingleton(); - bind(PercolatorQueriesRegistry.class).asEagerSingleton(); + bind(StoreRecoveryService.class).asEagerSingleton(); bind(ShardPercolateService.class).asEagerSingleton(); bind(ShardTermVectorsService.class).asEagerSingleton(); - bind(IndexShardSnapshotAndRestoreService.class).asEagerSingleton(); } diff --git a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewaySnapshotFailedException.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardRecoveryException.java similarity index 77% rename from core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewaySnapshotFailedException.java rename to core/src/main/java/org/elasticsearch/index/shard/IndexShardRecoveryException.java index 6d468c22aeb..2e9ad5b29cd 100644 --- a/core/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewaySnapshotFailedException.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardRecoveryException.java @@ -17,16 +17,16 @@ * under the License. */ -package org.elasticsearch.index.gateway; +package org.elasticsearch.index.shard; +import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.ShardId; /** * */ -public class IndexShardGatewaySnapshotFailedException extends IndexShardGatewayException { - - public IndexShardGatewaySnapshotFailedException(ShardId shardId, String msg, Throwable cause) { +public class IndexShardRecoveryException extends IndexShardException { + public IndexShardRecoveryException(ShardId shardId, String msg, Throwable cause) { super(shardId, msg, cause); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 4a8d3f4a843..01f5db1e126 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -23,34 +23,26 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; -import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; -import org.elasticsearch.index.cache.filter.ShardFilterCache; -import org.elasticsearch.index.cache.query.ShardQueryCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataService; -import org.elasticsearch.index.fielddata.ShardFieldData; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.query.IndexQueryParserService; -import org.elasticsearch.index.search.stats.ShardSearchService; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.termvectors.ShardTermVectorsService; -import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesWarmer; +import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -65,27 +57,22 @@ public final class ShadowIndexShard extends IndexShard { @Inject public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService, - IndicesLifecycle indicesLifecycle, Store store, + IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, - IndexAliasesService indexAliasesService, ShardIndexingService indexingService, - ShardSearchService searchService, - ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, - ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, + IndexAliasesService indexAliasesService, IndicesFilterCache indicesFilterCache, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, - IndexService indexService, ShardQueryCache shardQueryCache, - ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer, + IndexService indexService, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, ClusterService clusterService, - NodeEnvironment nodeEnv, ShardPath path, BigArrays bigArrays) throws IOException { - super(shardId, indexSettingsService, indicesLifecycle, store, + ShardPath path, BigArrays bigArrays) throws IOException { + super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, - indexingService, searchService, shardWarmerService, shardFilterCache, - shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService, + indicesFilterCache, shardPercolateService, codecService, termVectorsService, indexFieldDataService, indexService, - shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService, - factory, clusterService, nodeEnv, path, bigArrays); + warmer, deletionPolicy, similarityService, + factory, clusterService, path, bigArrays); } /** diff --git a/core/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/shard/SnapshotStatus.java similarity index 98% rename from core/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java rename to core/src/main/java/org/elasticsearch/index/shard/SnapshotStatus.java index 324f5555ab3..f4b26cd988d 100644 --- a/core/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/shard/SnapshotStatus.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway; +package org.elasticsearch.index.shard; /** * diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java new file mode 100644 index 00000000000..32275c4c1fd --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java @@ -0,0 +1,337 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.routing.RestoreSource; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; + +/** + * + */ +public class StoreRecoveryService extends AbstractIndexShardComponent implements Closeable { + + private final MappingUpdatedAction mappingUpdatedAction; + + private final ThreadPool threadPool; + + private final ClusterService clusterService; + + private final TimeValue waitForMappingUpdatePostRecovery; + + private final CancellableThreads cancellableThreads = new CancellableThreads(); + + private static final String SETTING_MAPPING_UPDATE_WAIT_LEGACY = "index.gateway.wait_for_mapping_update_post_recovery"; + private static final String SETTING_MAPPING_UPDATE_WAIT = "index.shard.wait_for_mapping_update_post_recovery"; + private final RestoreService restoreService; + private final RepositoriesService repositoriesService; + + @Inject + public StoreRecoveryService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, + MappingUpdatedAction mappingUpdatedAction, ClusterService clusterService, RepositoriesService repositoriesService, RestoreService restoreService) { + super(shardId, indexSettings); + this.threadPool = threadPool; + this.mappingUpdatedAction = mappingUpdatedAction; + this.restoreService = restoreService; + this.repositoriesService = repositoriesService; + this.clusterService = clusterService; + this.waitForMappingUpdatePostRecovery = indexSettings.getAsTime(SETTING_MAPPING_UPDATE_WAIT, indexSettings.getAsTime(SETTING_MAPPING_UPDATE_WAIT_LEGACY, TimeValue.timeValueSeconds(15))); + } + + public interface RecoveryListener { + void onRecoveryDone(); + + void onIgnoreRecovery(String reason); + + void onRecoveryFailed(IndexShardRecoveryException e); + } + + /** + * Recovers the state of the shard from the gateway. + */ + public void recover(final IndexShard indexShard, final boolean indexShouldExists, final RecoveryListener listener) throws IndexShardRecoveryException { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + listener.onIgnoreRecovery("shard closed"); + return; + } + if (!indexShard.routingEntry().primary()) { + listener.onRecoveryFailed(new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null)); + return; + } + try { + if (indexShard.routingEntry().restoreSource() != null) { + indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource()); + } else { + indexShard.recovering("from store", RecoveryState.Type.STORE, clusterService.localNode()); + } + } catch (IllegalIndexShardStateException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already recovering + listener.onIgnoreRecovery("already in recovering process, " + e.getMessage()); + return; + } + + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + + try { + final RecoveryState recoveryState = indexShard.recoveryState(); + if (indexShard.routingEntry().restoreSource() != null) { + logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource()); + restore(indexShard, recoveryState); + } else { + logger.debug("starting recovery from shard_store ..."); + recoverFromStore(indexShard, indexShouldExists, recoveryState); + } + + // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway + // to call post recovery. + IndexShardState shardState = indexShard.state(); + assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]"; + + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("recovery completed from ").append("shard_store").append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n"); + RecoveryState.Index index = recoveryState.getIndex(); + sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [") + .append(new ByteSizeValue(index.totalBytes())).append("], took[") + .append(TimeValue.timeValueMillis(index.time())).append("]\n"); + sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [") + .append(new ByteSizeValue(index.recoveredBytes())).append("]\n"); + sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [") + .append(new ByteSizeValue(index.reusedBytes())).append("]\n"); + sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [") + .append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n"); + sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) + .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]"); + logger.trace(sb.toString()); + } else if (logger.isDebugEnabled()) { + logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time())); + } + listener.onRecoveryDone(); + } catch (IndexShardRecoveryException e) { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + listener.onIgnoreRecovery("shard closed"); + return; + } + if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { + // got closed on us, just ignore this recovery + listener.onIgnoreRecovery("shard closed"); + return; + } + listener.onRecoveryFailed(e); + } catch (IndexShardClosedException e) { + listener.onIgnoreRecovery("shard closed"); + } catch (IndexShardNotStartedException e) { + listener.onIgnoreRecovery("shard closed"); + } catch (Exception e) { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + listener.onIgnoreRecovery("shard closed"); + return; + } + listener.onRecoveryFailed(new IndexShardRecoveryException(shardId, "failed recovery", e)); + } + } + }); + } + + /** + * Recovers the state of the shard from the store. + */ + private void recoverFromStore(IndexShard indexShard, boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardRecoveryException { + indexShard.prepareForIndexRecovery(); + long version = -1; + final Map typesToUpdate; + SegmentInfos si = null; + final Store store = indexShard.store(); + store.incRef(); + try { + try { + store.failIfCorrupted(); + try { + si = store.readLastCommittedSegmentsInfo(); + } catch (Throwable e) { + String files = "_unknown_"; + try { + files = Arrays.toString(store.directory().listAll()); + } catch (Throwable e1) { + files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")"; + } + if (indexShouldExists) { + throw new IndexShardRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e); + } + } + if (si != null) { + if (indexShouldExists) { + version = si.getVersion(); + } else { + // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) + // its a "new index create" API, we have to do something, so better to clean it than use same data + logger.trace("cleaning existing shard, shouldn't exists"); + IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); + writer.close(); + recoveryState.getTranslog().totalOperations(0); + } + } + } catch (Throwable e) { + throw new IndexShardRecoveryException(shardId(), "failed to fetch index version after copying it over", e); + } + recoveryState.getIndex().updateVersion(version); + + // since we recover from local, just fill the files and size + try { + final RecoveryState.Index index = recoveryState.getIndex(); + if (si != null) { + final Directory directory = store.directory(); + for (String name : Lucene.files(si)) { + long length = directory.fileLength(name); + index.addFileDetail(name, length, true); + } + } + } catch (IOException e) { + logger.debug("failed to list file details", e); + } + if (indexShouldExists == false) { + recoveryState.getTranslog().totalOperations(0); + recoveryState.getTranslog().totalOperationsOnStart(0); + } + typesToUpdate = indexShard.performTranslogRecovery(); + + indexShard.finalizeRecovery(); + String indexName = indexShard.shardId().index().name(); + for (Map.Entry entry : typesToUpdate.entrySet()) { + validateMappingUpdate(indexName, entry.getKey(), entry.getValue()); + } + indexShard.postRecovery("post recovery from shard_store"); + } catch (EngineException e) { + throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e); + } finally { + store.decRef(); + } + } + + private void validateMappingUpdate(final String indexName, final String type, Mapping update) { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference error = new AtomicReference<>(); + mappingUpdatedAction.updateMappingOnMaster(indexName, type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() { + @Override + public void onMappingUpdate() { + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + latch.countDown(); + error.set(t); + } + }); + cancellableThreads.execute(new CancellableThreads.Interruptable() { + @Override + public void run() throws InterruptedException { + try { + if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) { + logger.debug("waited for mapping update on master for [{}], yet timed out", type); + } else { + if (error.get() != null) { + throw new IndexShardRecoveryException(shardId, "Failed to propagate mappings on master post recovery", error.get()); + } + } + } catch (InterruptedException e) { + logger.debug("interrupted while waiting for mapping update"); + throw e; + } + } + }); + } + + /** + * Restores shard from {@link RestoreSource} associated with this shard in routing table + * + * @param recoveryState recovery state + */ + private void restore(final IndexShard indexShard, final RecoveryState recoveryState) { + RestoreSource restoreSource = indexShard.routingEntry().restoreSource(); + if (restoreSource == null) { + throw new IndexShardRestoreFailedException(shardId, "empty restore source"); + } + if (logger.isTraceEnabled()) { + logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId); + } + try { + recoveryState.getTranslog().totalOperations(0); + recoveryState.getTranslog().totalOperationsOnStart(0); + indexShard.prepareForIndexRecovery(); + IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); + ShardId snapshotShardId = shardId; + if (!shardId.getIndex().equals(restoreSource.index())) { + snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); + } + indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState); + indexShard.skipTranslogRecovery(true); + indexShard.finalizeRecovery(); + indexShard.postRecovery("restore done"); + restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId); + } catch (Throwable t) { + if (Lucene.isCorruptionException(t)) { + restoreService.failRestore(restoreSource.snapshotId(), shardId()); + } + throw new IndexShardRestoreFailedException(shardId, "restore failed", t); + } + } + + @Override + public void close() { + cancellableThreads.cancel("closed"); + } +} diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java deleted file mode 100644 index de183c02e9f..00000000000 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.snapshots; - -import org.elasticsearch.cluster.metadata.SnapshotId; -import org.elasticsearch.cluster.routing.RestoreSource; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; -import org.elasticsearch.index.engine.SnapshotFailedEngineException; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.snapshots.RestoreService; - - -/** - * Shard level snapshot and restore service - *

- * Performs snapshot and restore operations on the shard level. - */ -public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardComponent { - - private final IndexShard indexShard; - - private final RepositoriesService repositoriesService; - - private final RestoreService restoreService; - - @Inject - public IndexShardSnapshotAndRestoreService(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, RepositoriesService repositoriesService, RestoreService restoreService) { - super(shardId, indexSettings); - this.indexShard = indexShard; - this.repositoriesService = repositoriesService; - this.restoreService = restoreService; - } - - /** - * Creates shard snapshot - * - * @param snapshotId snapshot id - * @param snapshotStatus snapshot status - */ - public void snapshot(final SnapshotId snapshotId, final IndexShardSnapshotStatus snapshotStatus) { - IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(snapshotId.getRepository()); - if (!indexShard.routingEntry().primary()) { - throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); - } - if (indexShard.routingEntry().relocating()) { - // do not snapshot when in the process of relocation of primaries so we won't get conflicts - throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); - } - if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) { - // shard has just been created, or still recovering - throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); - } - - try { - // we flush first to make sure we get the latest writes snapshotted - SnapshotIndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); - try { - indexShardRepository.snapshot(snapshotId, shardId, snapshotIndexCommit, snapshotStatus); - if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("snapshot (").append(snapshotId.getSnapshot()).append(") completed to ").append(indexShardRepository).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n"); - sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); - logger.debug(sb.toString()); - } - } finally { - snapshotIndexCommit.close(); - } - } catch (SnapshotFailedEngineException e) { - throw e; - } catch (IndexShardSnapshotFailedException e) { - throw e; - } catch (Throwable e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); - } - } - - /** - * Restores shard from {@link RestoreSource} associated with this shard in routing table - * - * @param recoveryState recovery state - */ - public void restore(final RecoveryState recoveryState) { - RestoreSource restoreSource = indexShard.routingEntry().restoreSource(); - if (restoreSource == null) { - throw new IndexShardRestoreFailedException(shardId, "empty restore source"); - } - if (logger.isTraceEnabled()) { - logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId); - } - try { - recoveryState.getTranslog().totalOperations(0); - recoveryState.getTranslog().totalOperationsOnStart(0); - indexShard.prepareForIndexRecovery(); - IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); - ShardId snapshotShardId = shardId; - if (!shardId.getIndex().equals(restoreSource.index())) { - snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); - } - indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState); - indexShard.skipTranslogRecovery(true); - indexShard.finalizeRecovery(); - indexShard.postRecovery("restore done"); - restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId); - } catch (Throwable t) { - if (Lucene.isCorruptionException(t)) { - restoreService.failRestore(restoreSource.snapshotId(), shardId()); - } - throw new IndexShardRestoreFailedException(shardId, "restore failed", t); - } - } - -} diff --git a/core/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java b/core/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java index 416198e70ec..f9b33769f81 100644 --- a/core/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java +++ b/core/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java @@ -37,8 +37,6 @@ public class ShardIndexWarmerService extends AbstractIndexShardComponent { private final CounterMetric current = new CounterMetric(); private final MeanMetric warmerMetric = new MeanMetric(); - - @Inject public ShardIndexWarmerService(ShardId shardId, @IndexSettings Settings indexSettings) { super(shardId, indexSettings); } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index eb74d2f426c..6845a35ab11 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -52,8 +52,8 @@ import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.aliases.IndexAlias; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; -import org.elasticsearch.index.gateway.IndexShardGatewayService; +import org.elasticsearch.index.shard.IndexShardRecoveryException; +import org.elasticsearch.index.shard.StoreRecoveryService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettingsService; @@ -586,7 +586,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) { final SearchContext context = createAndPutContext(request); + final ShardSearchStats shardSearchStats = context.indexShard().searchService(); try { - context.indexShard().searchService().onPreQueryPhase(context); + shardSearchStats.onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); @@ -327,7 +329,7 @@ public class SearchService extends AbstractLifecycleComponent { } else { contextProcessedSuccessfully(context); } - context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); + shardSearchStats.onQueryPhase(context, System.nanoTime() - time); return context.queryResult(); } catch (Throwable e) { @@ -335,7 +337,7 @@ public class SearchService extends AbstractLifecycleComponent { if (e instanceof ExecutionException) { e = e.getCause(); } - context.indexShard().searchService().onFailedQueryPhase(context); + shardSearchStats.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -346,17 +348,18 @@ public class SearchService extends AbstractLifecycleComponent { public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) { final SearchContext context = findContext(request.id()); + ShardSearchStats shardSearchStats = context.indexShard().searchService(); try { - context.indexShard().searchService().onPreQueryPhase(context); + shardSearchStats.onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); processScroll(request, context); queryPhase.execute(context); contextProcessedSuccessfully(context); - context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); + shardSearchStats.onQueryPhase(context, System.nanoTime() - time); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Throwable e) { - context.indexShard().searchService().onFailedQueryPhase(context); + shardSearchStats.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -377,8 +380,9 @@ public class SearchService extends AbstractLifecycleComponent { cleanContext(context); throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } + ShardSearchStats shardSearchStats = context.indexShard().searchService(); try { - context.indexShard().searchService().onPreQueryPhase(context); + shardSearchStats.onPreQueryPhase(context); long time = System.nanoTime(); queryPhase.execute(context); if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scroll() == null) { @@ -387,10 +391,10 @@ public class SearchService extends AbstractLifecycleComponent { } else { contextProcessedSuccessfully(context); } - context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); + shardSearchStats.onQueryPhase(context, System.nanoTime() - time); return context.queryResult(); } catch (Throwable e) { - context.indexShard().searchService().onFailedQueryPhase(context); + shardSearchStats.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -403,17 +407,18 @@ public class SearchService extends AbstractLifecycleComponent { final SearchContext context = createAndPutContext(request); contextProcessing(context); try { - context.indexShard().searchService().onPreQueryPhase(context); + ShardSearchStats shardSearchStats = context.indexShard().searchService(); + shardSearchStats.onPreQueryPhase(context); long time = System.nanoTime(); try { loadOrExecuteQueryPhase(request, context, queryPhase); } catch (Throwable e) { - context.indexShard().searchService().onFailedQueryPhase(context); + shardSearchStats.onFailedQueryPhase(context); throw ExceptionsHelper.convertToRuntime(e); } long time2 = System.nanoTime(); - context.indexShard().searchService().onQueryPhase(context, time2 - time); - context.indexShard().searchService().onPreFetchPhase(context); + shardSearchStats.onQueryPhase(context, time2 - time); + shardSearchStats.onPreFetchPhase(context); try { shortcutDocIdsToLoad(context); fetchPhase.execute(context); @@ -423,10 +428,10 @@ public class SearchService extends AbstractLifecycleComponent { contextProcessedSuccessfully(context); } } catch (Throwable e) { - context.indexShard().searchService().onFailedFetchPhase(context); + shardSearchStats.onFailedFetchPhase(context); throw ExceptionsHelper.convertToRuntime(e); } - context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); + shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); @@ -450,17 +455,18 @@ public class SearchService extends AbstractLifecycleComponent { throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } try { - context.indexShard().searchService().onPreQueryPhase(context); + ShardSearchStats shardSearchStats = context.indexShard().searchService(); + shardSearchStats.onPreQueryPhase(context); long time = System.nanoTime(); try { queryPhase.execute(context); } catch (Throwable e) { - context.indexShard().searchService().onFailedQueryPhase(context); + shardSearchStats.onFailedQueryPhase(context); throw ExceptionsHelper.convertToRuntime(e); } long time2 = System.nanoTime(); - context.indexShard().searchService().onQueryPhase(context, time2 - time); - context.indexShard().searchService().onPreFetchPhase(context); + shardSearchStats.onQueryPhase(context, time2 - time); + shardSearchStats.onPreFetchPhase(context); try { shortcutDocIdsToLoad(context); fetchPhase.execute(context); @@ -470,10 +476,10 @@ public class SearchService extends AbstractLifecycleComponent { contextProcessedSuccessfully(context); } } catch (Throwable e) { - context.indexShard().searchService().onFailedFetchPhase(context); + shardSearchStats.onFailedFetchPhase(context); throw ExceptionsHelper.convertToRuntime(e); } - context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); + shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); @@ -488,18 +494,19 @@ public class SearchService extends AbstractLifecycleComponent { final SearchContext context = findContext(request.id()); contextProcessing(context); try { + ShardSearchStats shardSearchStats = context.indexShard().searchService(); processScroll(request, context); - context.indexShard().searchService().onPreQueryPhase(context); + shardSearchStats.onPreQueryPhase(context); long time = System.nanoTime(); try { queryPhase.execute(context); } catch (Throwable e) { - context.indexShard().searchService().onFailedQueryPhase(context); + shardSearchStats.onFailedQueryPhase(context); throw ExceptionsHelper.convertToRuntime(e); } long time2 = System.nanoTime(); - context.indexShard().searchService().onQueryPhase(context, time2 - time); - context.indexShard().searchService().onPreFetchPhase(context); + shardSearchStats.onQueryPhase(context, time2 - time); + shardSearchStats.onPreFetchPhase(context); try { shortcutDocIdsToLoad(context); fetchPhase.execute(context); @@ -509,10 +516,10 @@ public class SearchService extends AbstractLifecycleComponent { contextProcessedSuccessfully(context); } } catch (Throwable e) { - context.indexShard().searchService().onFailedFetchPhase(context); + shardSearchStats.onFailedFetchPhase(context); throw ExceptionsHelper.convertToRuntime(e); } - context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); + shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); @@ -526,12 +533,13 @@ public class SearchService extends AbstractLifecycleComponent { public FetchSearchResult executeFetchPhase(ShardFetchRequest request) { final SearchContext context = findContext(request.id()); contextProcessing(context); + final ShardSearchStats shardSearchStats = context.indexShard().searchService(); try { if (request.lastEmittedDoc() != null) { context.lastEmittedDoc(request.lastEmittedDoc()); } context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - context.indexShard().searchService().onPreFetchPhase(context); + shardSearchStats.onPreFetchPhase(context); long time = System.nanoTime(); fetchPhase.execute(context); if (context.scroll() == null) { @@ -539,10 +547,10 @@ public class SearchService extends AbstractLifecycleComponent { } else { contextProcessedSuccessfully(context); } - context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time); + shardSearchStats.onFetchPhase(context, System.nanoTime() - time); return context.fetchResult(); } catch (Throwable e) { - context.indexShard().searchService().onFailedFetchPhase(context); + shardSearchStats.onFailedFetchPhase(context); logger.trace("Fetch phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 1536f05f073..862eb003093 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -49,6 +49,8 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.StoreRecoveryService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -79,10 +81,10 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX * method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link org.elasticsearch.index.gateway.IndexShardGatewayService#recover(boolean, org.elasticsearch.index.gateway.IndexShardGatewayService.RecoveryListener)} + * {@link StoreRecoveryService#recover(IndexShard, boolean, StoreRecoveryService.RecoveryListener)} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null - * {@code recover} method uses {@link org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService#restore(org.elasticsearch.indices.recovery.RecoveryState)} + * {@code recover} method uses {@link StoreRecoveryService#restore(org.elasticsearch.indices.recovery.RecoveryState)} * method to start shard restore process. *

* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)}, @@ -449,7 +451,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis } /** - * This method is used by {@link org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService} to notify + * This method is used by {@link StoreRecoveryService} to notify * {@code RestoreService} about shard restore completion. * * @param snapshotId snapshot id diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index d291c68ce87..03b516f7cdc 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -29,10 +29,17 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.engine.SnapshotFailedEngineException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService; +import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -269,31 +276,75 @@ public class SnapshotShardsService extends AbstractLifecycleComponent> entry : newSnapshots.entrySet()) { for (final Map.Entry shardEntry : entry.getValue().entrySet()) { + final ShardId shardId = shardEntry.getKey(); try { - final IndexShardSnapshotAndRestoreService shardSnapshotService = indicesService.indexServiceSafe(shardEntry.getKey().getIndex()).shardInjectorSafe(shardEntry.getKey().id()) - .getInstance(IndexShardSnapshotAndRestoreService.class); + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).shard(shardId.id()); executor.execute(new AbstractRunnable() { @Override public void doRun() { - shardSnapshotService.snapshot(entry.getKey(), shardEntry.getValue()); - updateIndexShardSnapshotStatus(entry.getKey(), shardEntry.getKey(), new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)); + snapshot(indexShard, entry.getKey(), shardEntry.getValue()); + updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)); } @Override public void onFailure(Throwable t) { - logger.warn("[{}] [{}] failed to create snapshot", t, shardEntry.getKey(), entry.getKey()); - updateIndexShardSnapshotStatus(entry.getKey(), shardEntry.getKey(), new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(t))); + logger.warn("[{}] [{}] failed to create snapshot", t, shardId, entry.getKey()); + updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(t))); } }); } catch (Throwable t) { - updateIndexShardSnapshotStatus(entry.getKey(), shardEntry.getKey(), new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(t))); + updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(t))); } } } } } + /** + * Creates shard snapshot + * + * @param snapshotId snapshot id + * @param snapshotStatus snapshot status + */ + private void snapshot(final IndexShard indexShard, final SnapshotId snapshotId, final IndexShardSnapshotStatus snapshotStatus) { + IndexShardRepository indexShardRepository = snapshotsService.getRepositoriesService().indexShardRepository(snapshotId.getRepository()); + ShardId shardId = indexShard.shardId(); + if (!indexShard.routingEntry().primary()) { + throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); + } + if (indexShard.routingEntry().relocating()) { + // do not snapshot when in the process of relocation of primaries so we won't get conflicts + throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); + } + if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) { + // shard has just been created, or still recovering + throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); + } + + try { + // we flush first to make sure we get the latest writes snapshotted + SnapshotIndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); + try { + indexShardRepository.snapshot(snapshotId, shardId, snapshotIndexCommit, snapshotStatus); + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("snapshot (").append(snapshotId.getSnapshot()).append(") completed to ").append(indexShardRepository).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n"); + sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); + logger.debug(sb.toString()); + } + } finally { + snapshotIndexCommit.close(); + } + } catch (SnapshotFailedEngineException e) { + throw e; + } catch (IndexShardSnapshotFailedException e) { + throw e; + } catch (Throwable e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); + } + } + /** * Checks if any shards were processed that the new master doesn't know about * @param event diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1c99c576ff8..5d4eae62b29 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1068,6 +1068,10 @@ public class SnapshotsService extends AbstractLifecycleComponent segments = engine.segments(false); assertThat(segments.isEmpty(), equalTo(true)); assertThat(engine.segmentsStats().getCount(), equalTo(0l)); @@ -384,9 +377,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { } public void testVerboseSegments() throws Exception { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); try (Store store = createStore(); - Engine engine = createEngine(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), NoMergePolicy.INSTANCE)) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), NoMergePolicy.INSTANCE)) { List segments = engine.segments(true); assertThat(segments.isEmpty(), equalTo(true)); @@ -417,9 +409,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testSegmentsWithMergeFlag() throws Exception { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); try (Store store = createStore(); - Engine engine = createEngine(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); @@ -686,9 +677,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { } public void testSyncedFlush() throws IOException { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); try (Store store = createStore(); - Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new LogByteSizeMergePolicy()), false)) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); @@ -904,9 +894,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { } public void testForceMerge() throws IOException { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); try (Store store = createStore(); - Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { @@ -1355,9 +1344,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Slow @Test public void testEnableGcDeletes() throws Exception { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); try (Store store = createStore(); - Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { engine.config().setEnableGcDeletes(false); // Add document @@ -1577,9 +1565,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { // Tiny indexing buffer: Settings indexSettings = Settings.builder().put(defaultSettings) .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build(); - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), indexSettings); try (Store store = createStore(); - Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), + Engine engine = new InternalEngine(config(indexSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { for (int i = 0; i < 100; i++) { String id = Integer.toString(i); @@ -1628,9 +1615,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { // expected } // now it should be OK. - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings) - .put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build()); - engine = createEngine(indexSettingsService, store, primaryTranslogDir, new MergeSchedulerConfig(defaultSettings), newMergePolicy()); + Settings indexSettings = Settings.builder().put(defaultSettings).put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build(); + engine = createEngine(indexSettings, store, primaryTranslogDir, new MergeSchedulerConfig(indexSettings), newMergePolicy()); } public void testTranslogReplayWithFailure() throws IOException { @@ -1882,7 +1868,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { /* create a TranslogConfig that has been created with a different UUID */ TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); - EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettingsService() + EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener() , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index fe831a98fb3..aef646da9c1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -42,7 +42,6 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.indexing.ShardIndexingService; -import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; @@ -115,6 +114,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase { .put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us .put(EngineConfig.INDEX_CODEC_SETTING, codecName) .put(EngineConfig.INDEX_CONCURRENCY_SETTING, indexConcurrency) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .build(); // TODO randomize more settings threadPool = new ThreadPool(getClass().getName()); dirPath = createTempDir(); @@ -198,31 +198,29 @@ public class ShadowEngineTests extends ElasticsearchTestCase { protected ShadowEngine createShadowEngine(Store store) { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); - return createShadowEngine(indexSettingsService, store); + return createShadowEngine(defaultSettings, store); } protected InternalEngine createInternalEngine(Store store, Path translogPath) { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); - return createInternalEngine(indexSettingsService, store, translogPath); + return createInternalEngine(defaultSettings, store, translogPath); } - protected ShadowEngine createShadowEngine(IndexSettingsService indexSettingsService, Store store) { - return new ShadowEngine(config(indexSettingsService, store, null, new MergeSchedulerConfig(indexSettingsService.indexSettings()), null)); + protected ShadowEngine createShadowEngine(Settings indexSettings, Store store) { + return new ShadowEngine(config(indexSettings, store, null, new MergeSchedulerConfig(indexSettings), null)); } - protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath) { - return createInternalEngine(indexSettingsService, store, translogPath, newMergePolicy()); + protected InternalEngine createInternalEngine(Settings indexSettings, Store store, Path translogPath) { + return createInternalEngine(indexSettings, store, translogPath, newMergePolicy()); } - protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettingsService, store, translogPath, new MergeSchedulerConfig(indexSettingsService.indexSettings()), mergePolicy), true); + protected InternalEngine createInternalEngine(Settings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { + return new InternalEngine(config(indexSettings, store, translogPath, new MergeSchedulerConfig(indexSettings), mergePolicy), true); } - public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { + public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { IndexWriterConfig iwc = newIndexWriterConfig(); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettingsService.getSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); - EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); + EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings , null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig, iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() { @Override @@ -268,9 +266,8 @@ public class ShadowEngineTests extends ElasticsearchTestCase { @Test public void testSegments() throws Exception { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); primaryEngine.close(); // recreate without merging - primaryEngine = createInternalEngine(indexSettingsService, store, createTempDir(), NoMergePolicy.INSTANCE); + primaryEngine = createInternalEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE); List segments = primaryEngine.segments(false); assertThat(segments.isEmpty(), equalTo(true)); assertThat(primaryEngine.segmentsStats().getCount(), equalTo(0l)); @@ -440,9 +437,8 @@ public class ShadowEngineTests extends ElasticsearchTestCase { @Test public void testVerboseSegments() throws Exception { - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); primaryEngine.close(); // recreate without merging - primaryEngine = createInternalEngine(indexSettingsService, store, createTempDir(), NoMergePolicy.INSTANCE); + primaryEngine = createInternalEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE); List segments = primaryEngine.segments(true); assertThat(segments.isEmpty(), equalTo(true)); diff --git a/core/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java b/core/src/test/java/org/elasticsearch/index/shard/CommitPointsTests.java similarity index 98% rename from core/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java rename to core/src/test/java/org/elasticsearch/index/shard/CommitPointsTests.java index f3182fa7a65..64c53861a59 100644 --- a/core/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/CommitPointsTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway; +package org.elasticsearch.index.shard; import com.google.common.base.Charsets; import com.google.common.collect.Lists; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java index 8238c159df2..b5ac0cce405 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java @@ -41,10 +41,10 @@ public class IndexShardModuleTests extends ElasticsearchTestCase { .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) .build(); - IndexShardModule ism1 = new IndexShardModule(shardId, true, regularSettings, null); - IndexShardModule ism2 = new IndexShardModule(shardId, false, regularSettings, null); - IndexShardModule ism3 = new IndexShardModule(shardId, true, shadowSettings, null); - IndexShardModule ism4 = new IndexShardModule(shardId, false, shadowSettings, null); + IndexShardModule ism1 = new IndexShardModule(shardId, true, regularSettings); + IndexShardModule ism2 = new IndexShardModule(shardId, false, regularSettings); + IndexShardModule ism3 = new IndexShardModule(shardId, true, shadowSettings); + IndexShardModule ism4 = new IndexShardModule(shardId, false, shadowSettings); assertFalse("no shadow replicas for normal settings", ism1.useShadowEngine()); assertFalse("no shadow replicas for normal settings", ism2.useShadowEngine()); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java index 710434c4858..0adbcbeb831 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java @@ -164,7 +164,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest { ShardRecoveryResponse shardResponse = shardResponses.get(0); RecoveryState state = shardResponse.recoveryState(); - assertRecoveryState(state, 0, Type.GATEWAY, Stage.DONE, node, node, false); + assertRecoveryState(state, 0, Type.STORE, Stage.DONE, node, node, false); validateIndexRecoveryState(state.getIndex()); } @@ -219,7 +219,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest { // validate node A recovery ShardRecoveryResponse nodeAShardResponse = nodeAResponses.get(0); - assertRecoveryState(nodeAShardResponse.recoveryState(), 0, Type.GATEWAY, Stage.DONE, nodeA, nodeA, false); + assertRecoveryState(nodeAShardResponse.recoveryState(), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); validateIndexRecoveryState(nodeAShardResponse.recoveryState().getIndex()); // validate node B recovery @@ -272,7 +272,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest { List nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); assertThat(nodeBResponses.size(), equalTo(1)); - assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.GATEWAY, Stage.DONE, nodeA, nodeA, false); + assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex()); assertOnGoingRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, nodeA, nodeB, false); diff --git a/docs/reference/cat/recovery.asciidoc b/docs/reference/cat/recovery.asciidoc index 1c9fe9e60f8..857e631cec2 100644 --- a/docs/reference/cat/recovery.asciidoc +++ b/docs/reference/cat/recovery.asciidoc @@ -6,7 +6,7 @@ completed. It is a more compact view of the JSON <> A A recovery event occurs anytime an index shard moves to a different node in the cluster. This can happen during a snapshot recovery, a change in replication level, node failure, or -on node startup. This last type is called a local gateway recovery and is the normal +on node startup. This last type is called a local store recovery and is the normal way for shards to be loaded from disk when a node starts up. As an example, here is what the recovery state of a cluster may look like when there @@ -16,13 +16,13 @@ are no shards in transit from one node to another: ---------------------------------------------------------------------------- > curl -XGET 'localhost:9200/_cat/recovery?v' index shard time type stage source target files percent bytes percent -wiki 0 73 gateway done hostA hostA 36 100.0% 24982806 100.0% -wiki 1 245 gateway done hostA hostA 33 100.0% 24501912 100.0% -wiki 2 230 gateway done hostA hostA 36 100.0% 30267222 100.0% +wiki 0 73 store done hostA hostA 36 100.0% 24982806 100.0% +wiki 1 245 store done hostA hostA 33 100.0% 24501912 100.0% +wiki 2 230 store done hostA hostA 36 100.0% 30267222 100.0% --------------------------------------------------------------------------- In the above case, the source and target nodes are the same because the recovery -type was gateway, i.e. they were read from local storage on node start. +type was store, i.e. they were read from local storage on node start. Now let's see what a live recovery looks like. By increasing the replica count of our index and bringing another node online to host the replicas, we can see @@ -35,12 +35,12 @@ what a live shard recovery looks like. > curl -XGET 'localhost:9200/_cat/recovery?v' index shard time type stage source target files percent bytes percent -wiki 0 1252 gateway done hostA hostA 4 100.0% 23638870 100.0% +wiki 0 1252 store done hostA hostA 4 100.0% 23638870 100.0% wiki 0 1672 replica index hostA hostB 4 75.0% 23638870 48.8% wiki 1 1698 replica index hostA hostB 4 75.0% 23348540 49.4% -wiki 1 4812 gateway done hostA hostA 33 100.0% 24501912 100.0% +wiki 1 4812 store done hostA hostA 33 100.0% 24501912 100.0% wiki 2 1689 replica index hostA hostB 4 75.0% 28681851 40.2% -wiki 2 5317 gateway done hostA hostA 36 100.0% 30267222 100.0% +wiki 2 5317 store done hostA hostA 36 100.0% 30267222 100.0% ---------------------------------------------------------------------------- We can see in the above listing that our 3 initial shards are in various stages diff --git a/docs/reference/cluster/pending.asciidoc b/docs/reference/cluster/pending.asciidoc index 0e018e414ff..4997a035ba2 100644 --- a/docs/reference/cluster/pending.asciidoc +++ b/docs/reference/cluster/pending.asciidoc @@ -28,14 +28,14 @@ like this: { "insert_order": 46, "priority": "HIGH", - "source": "shard-started ([foo_2][1], node[tMTocMvQQgGCkj7QDHl3OA], [P], s[INITIALIZING]), reason [after recovery from gateway]", + "source": "shard-started ([foo_2][1], node[tMTocMvQQgGCkj7QDHl3OA], [P], s[INITIALIZING]), reason [after recovery from shard_store]", "time_in_queue_millis": 842, "time_in_queue": "842ms" }, { "insert_order": 45, "priority": "HIGH", - "source": "shard-started ([foo_2][0], node[tMTocMvQQgGCkj7QDHl3OA], [P], s[INITIALIZING]), reason [after recovery from gateway]", + "source": "shard-started ([foo_2][0], node[tMTocMvQQgGCkj7QDHl3OA], [P], s[INITIALIZING]), reason [after recovery from shard_store]", "time_in_queue_millis": 858, "time_in_queue": "858ms" } diff --git a/docs/reference/docs/delete.asciidoc b/docs/reference/docs/delete.asciidoc index d157b9efc43..976b73c8633 100644 --- a/docs/reference/docs/delete.asciidoc +++ b/docs/reference/docs/delete.asciidoc @@ -125,7 +125,7 @@ down indexing). The primary shard assigned to perform the delete operation might not be available when the delete operation is executed. Some reasons for this -might be that the primary shard is currently recovering from a gateway +might be that the primary shard is currently recovering from a store or undergoing relocation. By default, the delete operation will wait on the primary shard to become available for up to 1 minute before failing and responding with an error. The `timeout` parameter can be used to diff --git a/docs/reference/indices/recovery.asciidoc b/docs/reference/indices/recovery.asciidoc index defc86d25c1..c4aabac3ac3 100644 --- a/docs/reference/indices/recovery.asciidoc +++ b/docs/reference/indices/recovery.asciidoc @@ -101,7 +101,7 @@ Response: "index1" : { "shards" : [ { "id" : 0, - "type" : "GATEWAY", + "type" : "STORE", "stage" : "DONE", "primary" : true, "start_time" : "2014-02-24T12:38:06.349", @@ -189,7 +189,7 @@ Description of output fields: [horizontal] `id`:: Shard ID `type`:: Recovery type: - * gateway + * store * snapshot * replica * relocating diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.recovery/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.recovery/10_basic.yaml index 89b1fb8c765..8ac01eaf317 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.recovery/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.recovery/10_basic.yaml @@ -30,7 +30,7 @@ index1 \s+ \d \s+ # shard \d+ \s+ # time - (gateway|replica|snapshot|relocating) \s+ # type + (store|replica|snapshot|relocating) \s+ # type (init|index|start|translog|finalize|done) \s+ # stage [-\w./]+ \s+ # source_host [-\w./]+ \s+ # target_host diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.recovery/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.recovery/10_basic.yaml index f24ab554126..c37aab1cb94 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.recovery/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.recovery/10_basic.yaml @@ -17,7 +17,7 @@ indices.recovery: index: [test_1] - - match: { test_1.shards.0.type: "GATEWAY" } + - match: { test_1.shards.0.type: "STORE" } - match: { test_1.shards.0.stage: "DONE" } - match: { test_1.shards.0.primary: true } - match: { test_1.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }