diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index a95fa74957a..05d2df7cd57 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.index.warmer.WarmerStats; import java.io.IOException; @@ -64,6 +65,9 @@ public class CommonStats implements Streamable, ToXContent { @Nullable FlushStats flush; + @Nullable + WarmerStats warmer; + public void add(CommonStats stats) { if (docs == null) { if (stats.docs() != null) { @@ -129,6 +133,14 @@ public class CommonStats implements Streamable, ToXContent { } else { flush.add(stats.flush()); } + if (warmer == null) { + if (stats.warmer() != null) { + warmer = new WarmerStats(); + warmer.add(stats.warmer()); + } + } else { + warmer.add(stats.warmer()); + } } @Nullable @@ -211,6 +223,16 @@ public class CommonStats implements Streamable, ToXContent { return flush; } + @Nullable + public WarmerStats warmer() { + return this.warmer; + } + + @Nullable + public WarmerStats getWarmer() { + return this.warmer; + } + public static CommonStats readCommonStats(StreamInput in) throws IOException { CommonStats stats = new CommonStats(); stats.readFrom(in); @@ -243,6 +265,9 @@ public class CommonStats implements Streamable, ToXContent { if (in.readBoolean()) { flush = FlushStats.readFlushStats(in); } + if (in.readBoolean()) { + warmer = WarmerStats.readWarmerStats(in); + } } @Override @@ -295,6 +320,12 @@ public class CommonStats implements Streamable, ToXContent { out.writeBoolean(true); flush.writeTo(out); } + if (warmer == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + warmer.writeTo(out); + } } // note, requires a wrapping object @@ -324,6 +355,9 @@ public class CommonStats implements Streamable, ToXContent { if (flush != null) { flush.toXContent(builder, params); } + if (warmer != null) { + warmer.toXContent(builder, params); + } return builder; } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java index fb95905f39c..b4948046f77 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -44,6 +44,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest { private boolean merge = false; private boolean refresh = false; private boolean flush = false; + private boolean warmer = false; private String[] types = null; private String[] groups = null; @@ -64,6 +65,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest { merge = true; refresh = true; flush = true; + warmer = true; types = null; groups = null; return this; @@ -81,6 +83,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest { merge = false; refresh = false; flush = false; + warmer = false; types = null; groups = null; return this; @@ -188,6 +191,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest { return this.flush; } + public IndicesStatsRequest warmer(boolean warmer) { + this.warmer = warmer; + return this; + } + + public boolean warmer() { + return this.warmer; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -199,6 +211,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest { out.writeBoolean(merge); out.writeBoolean(flush); out.writeBoolean(refresh); + out.writeBoolean(warmer); if (types == null) { out.writeVInt(0); } else { @@ -228,6 +241,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest { merge = in.readBoolean(); flush = in.readBoolean(); refresh = in.readBoolean(); + warmer = in.readBoolean(); int size = in.readVInt(); if (size > 0) { types = new String[size]; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index d40214f7595..a1ad93589ed 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -116,6 +116,11 @@ public class IndicesStatsRequestBuilder extends BaseIndicesRequestBuilder listener) { client.stats(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 4a6794004b3..b0417bf5be2 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -169,6 +169,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi if (request.request.flush()) { stats.stats.flush = indexShard.flushStats(); } + if (request.request.warmer()) { + stats.stats.warmer = indexShard.warmerStats(); + } return stats; } diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 78d0159f0b1..9fcd8d8875d 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.ExtendedIndexSearcher; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.Filter; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.BytesHolder; @@ -154,6 +155,31 @@ public interface Engine extends IndexShardComponent, CloseableComponent { ExtendedIndexSearcher searcher(); } + static class SimpleSearcher implements Searcher { + + private final IndexSearcher searcher; + + public SimpleSearcher(IndexSearcher searcher) { + this.searcher = searcher; + } + + @Override + public IndexReader reader() { + return searcher.getIndexReader(); + } + + @Override + public ExtendedIndexSearcher searcher() { + return (ExtendedIndexSearcher) searcher; + } + + @Override + public boolean release() throws ElasticSearchException { + // nothing to release here... + return true; + } + } + static class Refresh { private final boolean waitForOperations; diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index b92acf0dbda..1671b06dae3 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -25,6 +25,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.bloom.BloomFilter; @@ -54,6 +55,7 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; +import org.elasticsearch.indices.warmer.InternalIndicesWarmer; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -94,6 +96,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final IndexSettingsService indexSettingsService; + @Nullable + private final InternalIndicesWarmer warmer; + private final Store store; private final SnapshotDeletionPolicy deletionPolicy; @@ -152,7 +157,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, - IndexSettingsService indexSettingsService, + IndexSettingsService indexSettingsService, @Nullable InternalIndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService, @@ -170,6 +175,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { this.threadPool = threadPool; this.indexSettingsService = indexSettingsService; + this.warmer = warmer; this.store = store; this.deletionPolicy = deletionPolicy; this.translog = translog; @@ -713,7 +719,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { public Searcher searcher() throws EngineException { SearcherManager manager = this.searcherManager; IndexSearcher searcher = manager.acquire(); - return new RobinSearchResult(searcher, manager); + return new RobinSearcher(searcher, manager); } @Override @@ -1353,12 +1359,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { return new SearcherManager(indexWriter, true, searcherFactory); } - static class RobinSearchResult implements Searcher { + static class RobinSearcher implements Searcher { private final IndexSearcher searcher; private final SearcherManager manager; - private RobinSearchResult(IndexSearcher searcher, SearcherManager manager) { + private RobinSearcher(IndexSearcher searcher, SearcherManager manager) { this.searcher = searcher; this.manager = manager; } @@ -1420,6 +1426,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { public IndexSearcher newSearcher(IndexReader reader) throws IOException { ExtendedIndexSearcher searcher = new ExtendedIndexSearcher(reader); searcher.setSimilarity(similarityService.defaultSearchSimilarity()); + if (warmer != null) { + warmer.warm(shardId, new SimpleSearcher(searcher)); + } return searcher; } } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index e4a055d6fd3..5f8717138b4 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.jmx.JmxService; /** @@ -46,5 +47,6 @@ public class IndexShardModule extends AbstractModule { if (JmxService.shouldExport(settings)) { bind(IndexShardManagement.class).asEagerSingleton(); } + bind(ShardIndexWarmerService.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 2139aa1da81..0545ef1d837 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -40,6 +40,8 @@ import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.index.warmer.ShardIndexWarmerService; +import org.elasticsearch.index.warmer.WarmerStats; /** * @@ -52,6 +54,8 @@ public interface IndexShard extends IndexShardComponent { ShardSearchService searchService(); + ShardIndexWarmerService warmerService(); + ShardRouting routingEntry(); DocsStats docStats(); @@ -70,6 +74,8 @@ public interface IndexShard extends IndexShardComponent { FlushStats flushStats(); + WarmerStats warmerStats(); + IndexShardState state(); Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException; diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 02a23cd8017..11a3d021b1e 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -65,6 +65,8 @@ import org.elasticsearch.index.shard.*; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.warmer.ShardIndexWarmerService; +import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.recovery.RecoveryStatus; @@ -111,6 +113,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private final ShardGetService getService; + private final ShardIndexWarmerService shardWarmerService; + private final Object mutex = new Object(); private final String checkIndexOnStartup; @@ -137,7 +141,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, - ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService) { + ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService) { super(shardId, indexSettings); this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indexSettingsService = indexSettingsService; @@ -153,6 +157,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I this.indexingService = indexingService; this.getService = getService.setIndexShard(this); this.searchService = searchService; + this.shardWarmerService = shardWarmerService; state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval())); @@ -195,6 +200,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return this.searchService; } + @Override + public ShardIndexWarmerService warmerService() { + return this.shardWarmerService; + } + @Override public ShardRouting routingEntry() { return this.shardRouting; @@ -495,6 +505,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return mergeScheduler.stats(); } + @Override + public WarmerStats warmerStats() { + return shardWarmerService.stats(); + } + @Override public void flush(Engine.Flush flush) throws ElasticSearchException { // we allows flush while recovering, since we allow for operations to happen diff --git a/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java b/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java new file mode 100644 index 00000000000..c386bba1690 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/warmer/ShardIndexWarmerService.java @@ -0,0 +1,57 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.warmer; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class ShardIndexWarmerService extends AbstractIndexShardComponent { + + private final CounterMetric current = new CounterMetric(); + private final MeanMetric warmerMetric = new MeanMetric(); + + + @Inject + public ShardIndexWarmerService(ShardId shardId, @IndexSettings Settings indexSettings) { + super(shardId, indexSettings); + } + + public void onPreWarm() { + current.inc(); + } + + public void onPostWarm(long tookInNanos) { + current.dec(); + warmerMetric.inc(tookInNanos); + } + + public WarmerStats stats() { + return new WarmerStats(current.count(), warmerMetric.count(), TimeUnit.NANOSECONDS.toMillis(warmerMetric.sum())); + } +} diff --git a/src/main/java/org/elasticsearch/index/warmer/WarmerStats.java b/src/main/java/org/elasticsearch/index/warmer/WarmerStats.java new file mode 100644 index 00000000000..7369bba0593 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/warmer/WarmerStats.java @@ -0,0 +1,128 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.warmer; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +public class WarmerStats implements Streamable, ToXContent { + + private long current; + + private long total; + + private long totalTimeInMillis; + + public WarmerStats() { + + } + + public WarmerStats(long current, long total, long totalTimeInMillis) { + this.current = current; + this.total = total; + this.totalTimeInMillis = totalTimeInMillis; + } + + public void add(long current, long total, long totalTimeInMillis) { + this.current += current; + this.total += total; + this.totalTimeInMillis += totalTimeInMillis; + } + + public void add(WarmerStats warmerStats) { + if (warmerStats == null) { + return; + } + this.current += warmerStats.current; + this.total += warmerStats.total; + this.totalTimeInMillis += warmerStats.totalTimeInMillis; + } + + public long current() { + return this.current; + } + + /** + * The total number of warmer executed. + */ + public long total() { + return this.total; + } + + /** + * The total time warmer have been executed (in milliseconds). + */ + public long totalTimeInMillis() { + return this.totalTimeInMillis; + } + + /** + * The total time warmer have been executed. + */ + public TimeValue totalTime() { + return new TimeValue(totalTimeInMillis); + } + + public static WarmerStats readWarmerStats(StreamInput in) throws IOException { + WarmerStats refreshStats = new WarmerStats(); + refreshStats.readFrom(in); + return refreshStats; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.WARMER); + builder.field(Fields.CURRENT, current); + builder.field(Fields.TOTAL, total); + builder.field(Fields.TOTAL_TIME, totalTime().toString()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis); + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString WARMER = new XContentBuilderString("warmer"); + static final XContentBuilderString CURRENT = new XContentBuilderString("current"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time"); + static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis"); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + current = in.readVLong(); + total = in.readVLong(); + totalTimeInMillis = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(current); + out.writeVLong(total); + out.writeVLong(totalTimeInMillis); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 032b8bec395..89583c037dc 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -34,6 +34,8 @@ import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.ttl.IndicesTTLService; +import org.elasticsearch.indices.warmer.IndicesWarmer; +import org.elasticsearch.indices.warmer.InternalIndicesWarmer; /** * @@ -66,5 +68,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(IndicesFilterCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); + bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/indices/warmer/IndicesWarmer.java b/src/main/java/org/elasticsearch/indices/warmer/IndicesWarmer.java new file mode 100644 index 00000000000..9f700586bf0 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/warmer/IndicesWarmer.java @@ -0,0 +1,37 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.warmer; + +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; + +/** + */ +public interface IndicesWarmer { + + static interface Listener { + void warm(ShardId shardId, IndexMetaData indexMetaData, Engine.Searcher search); + } + + void addListener(Listener listener); + + void removeListener(Listener listener); +} diff --git a/src/main/java/org/elasticsearch/indices/warmer/InternalIndicesWarmer.java b/src/main/java/org/elasticsearch/indices/warmer/InternalIndicesWarmer.java new file mode 100644 index 00000000000..b791947e28b --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/warmer/InternalIndicesWarmer.java @@ -0,0 +1,98 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.warmer; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesService; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +/** + */ +public class InternalIndicesWarmer extends AbstractComponent implements IndicesWarmer { + + private final ClusterService clusterService; + + private final IndicesService indicesService; + + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); + + @Inject + public InternalIndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) { + super(settings); + this.clusterService = clusterService; + this.indicesService = indicesService; + } + + @Override + public void addListener(Listener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(Listener listener) { + listeners.remove(listener); + } + + public void warm(ShardId shardId, Engine.Searcher searcher) { + IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name()); + if (indexMetaData == null) { + return; + } + if (!indexMetaData.settings().getAsBoolean("index.warm.enabled", true)) { + return; + } + IndexService indexService = indicesService.indexService(shardId.index().name()); + if (indexService == null) { + return; + } + IndexShard indexShard = indexService.shard(shardId.id()); + if (indexShard == null) { + return; + } + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}] warming [{}]", shardId.index().name(), shardId.id(), searcher.reader()); + } + indexShard.warmerService().onPreWarm(); + long time = System.nanoTime(); + for (Listener listener : listeners) { + try { + listener.warm(shardId, indexMetaData, searcher); + } catch (Exception e) { + logger.warn("[{}][{}] failed to warm [{}]", shardId.index().name(), shardId.id(), listener); + } + } + long took = System.nanoTime() - time; + indexShard.warmerService().onPostWarm(took); + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}] warming took [{}]", shardId.index().name(), shardId.id(), new TimeValue(took, TimeUnit.NANOSECONDS)); + } + } +} diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java index e0cca9b331d..1fba3e3f0b7 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java @@ -73,6 +73,9 @@ public class RestIndicesStatsAction extends BaseRestHandler { controller.registerHandler(GET, "/_stats/flush", new RestFlushStatsHandler()); controller.registerHandler(GET, "/{index}/_stats/flush", new RestFlushStatsHandler()); + + controller.registerHandler(GET, "/_stats/warmer", new RestWarmerStatsHandler()); + controller.registerHandler(GET, "/{index}/_stats/warmer", new RestWarmerStatsHandler()); } @Override @@ -99,6 +102,7 @@ public class RestIndicesStatsAction extends BaseRestHandler { indicesStatsRequest.merge(request.paramAsBoolean("merge", indicesStatsRequest.merge())); indicesStatsRequest.refresh(request.paramAsBoolean("refresh", indicesStatsRequest.refresh())); indicesStatsRequest.flush(request.paramAsBoolean("flush", indicesStatsRequest.flush())); + indicesStatsRequest.warmer(request.paramAsBoolean("warmer", indicesStatsRequest.warmer())); client.admin().indices().stats(indicesStatsRequest, new ActionListener() { @Override @@ -397,6 +401,43 @@ public class RestIndicesStatsAction extends BaseRestHandler { } } + class RestWarmerStatsHandler implements RestHandler { + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.clear().warmer(true); + indicesStatsRequest.indices(splitIndices(request.param("index"))); + indicesStatsRequest.types(splitTypes(request.param("types"))); + + client.admin().indices().stats(indicesStatsRequest, new ActionListener() { + @Override + public void onResponse(IndicesStats response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject(); + builder.field("ok", true); + buildBroadcastShardsHeader(builder, response); + response.toXContent(builder, request); + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + } + class RestRefreshStatsHandler implements RestHandler { @Override diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java index 5288365c6d1..fd0e2b7819e 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java @@ -21,13 +21,13 @@ package org.elasticsearch.test.unit.index.engine.robin; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.bloom.none.NoneBloomCache; -import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.robin.RobinEngine; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests; import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; @@ -38,7 +38,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_ public class SimpleRobinEngineTests extends AbstractSimpleEngineTests { protected Engine createEngine(Store store, Translog translog) { - return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), + return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); } }