allow internally to register index warmup actions, as well as expose stats on it

This commit is contained in:
Shay Banon 2012-04-29 00:37:20 +03:00
parent 6e09eab9e5
commit 8ca36c8dd5
16 changed files with 485 additions and 7 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.warmer.WarmerStats;
import java.io.IOException; import java.io.IOException;
@ -64,6 +65,9 @@ public class CommonStats implements Streamable, ToXContent {
@Nullable @Nullable
FlushStats flush; FlushStats flush;
@Nullable
WarmerStats warmer;
public void add(CommonStats stats) { public void add(CommonStats stats) {
if (docs == null) { if (docs == null) {
if (stats.docs() != null) { if (stats.docs() != null) {
@ -129,6 +133,14 @@ public class CommonStats implements Streamable, ToXContent {
} else { } else {
flush.add(stats.flush()); flush.add(stats.flush());
} }
if (warmer == null) {
if (stats.warmer() != null) {
warmer = new WarmerStats();
warmer.add(stats.warmer());
}
} else {
warmer.add(stats.warmer());
}
} }
@Nullable @Nullable
@ -211,6 +223,16 @@ public class CommonStats implements Streamable, ToXContent {
return flush; return flush;
} }
@Nullable
public WarmerStats warmer() {
return this.warmer;
}
@Nullable
public WarmerStats getWarmer() {
return this.warmer;
}
public static CommonStats readCommonStats(StreamInput in) throws IOException { public static CommonStats readCommonStats(StreamInput in) throws IOException {
CommonStats stats = new CommonStats(); CommonStats stats = new CommonStats();
stats.readFrom(in); stats.readFrom(in);
@ -243,6 +265,9 @@ public class CommonStats implements Streamable, ToXContent {
if (in.readBoolean()) { if (in.readBoolean()) {
flush = FlushStats.readFlushStats(in); flush = FlushStats.readFlushStats(in);
} }
if (in.readBoolean()) {
warmer = WarmerStats.readWarmerStats(in);
}
} }
@Override @Override
@ -295,6 +320,12 @@ public class CommonStats implements Streamable, ToXContent {
out.writeBoolean(true); out.writeBoolean(true);
flush.writeTo(out); flush.writeTo(out);
} }
if (warmer == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
warmer.writeTo(out);
}
} }
// note, requires a wrapping object // note, requires a wrapping object
@ -324,6 +355,9 @@ public class CommonStats implements Streamable, ToXContent {
if (flush != null) { if (flush != null) {
flush.toXContent(builder, params); flush.toXContent(builder, params);
} }
if (warmer != null) {
warmer.toXContent(builder, params);
}
return builder; return builder;
} }
} }

View File

@ -44,6 +44,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest {
private boolean merge = false; private boolean merge = false;
private boolean refresh = false; private boolean refresh = false;
private boolean flush = false; private boolean flush = false;
private boolean warmer = false;
private String[] types = null; private String[] types = null;
private String[] groups = null; private String[] groups = null;
@ -64,6 +65,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest {
merge = true; merge = true;
refresh = true; refresh = true;
flush = true; flush = true;
warmer = true;
types = null; types = null;
groups = null; groups = null;
return this; return this;
@ -81,6 +83,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest {
merge = false; merge = false;
refresh = false; refresh = false;
flush = false; flush = false;
warmer = false;
types = null; types = null;
groups = null; groups = null;
return this; return this;
@ -188,6 +191,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest {
return this.flush; return this.flush;
} }
public IndicesStatsRequest warmer(boolean warmer) {
this.warmer = warmer;
return this;
}
public boolean warmer() {
return this.warmer;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
@ -199,6 +211,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest {
out.writeBoolean(merge); out.writeBoolean(merge);
out.writeBoolean(flush); out.writeBoolean(flush);
out.writeBoolean(refresh); out.writeBoolean(refresh);
out.writeBoolean(warmer);
if (types == null) { if (types == null) {
out.writeVInt(0); out.writeVInt(0);
} else { } else {
@ -228,6 +241,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest {
merge = in.readBoolean(); merge = in.readBoolean();
flush = in.readBoolean(); flush = in.readBoolean();
refresh = in.readBoolean(); refresh = in.readBoolean();
warmer = in.readBoolean();
int size = in.readVInt(); int size = in.readVInt();
if (size > 0) { if (size > 0) {
types = new String[size]; types = new String[size];

View File

@ -116,6 +116,11 @@ public class IndicesStatsRequestBuilder extends BaseIndicesRequestBuilder<Indice
return this; return this;
} }
public IndicesStatsRequestBuilder setWarmer(boolean warmer) {
request.warmer(warmer);
return this;
}
@Override @Override
protected void doExecute(ActionListener<IndicesStats> listener) { protected void doExecute(ActionListener<IndicesStats> listener) {
client.stats(request, listener); client.stats(request, listener);

View File

@ -169,6 +169,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
if (request.request.flush()) { if (request.request.flush()) {
stats.stats.flush = indexShard.flushStats(); stats.stats.flush = indexShard.flushStats();
} }
if (request.request.warmer()) {
stats.stats.warmer = indexShard.warmerStats();
}
return stats; return stats;
} }

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.ExtendedIndexSearcher;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.Filter; import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.BytesHolder;
@ -154,6 +155,31 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
ExtendedIndexSearcher searcher(); ExtendedIndexSearcher searcher();
} }
static class SimpleSearcher implements Searcher {
private final IndexSearcher searcher;
public SimpleSearcher(IndexSearcher searcher) {
this.searcher = searcher;
}
@Override
public IndexReader reader() {
return searcher.getIndexReader();
}
@Override
public ExtendedIndexSearcher searcher() {
return (ExtendedIndexSearcher) searcher;
}
@Override
public boolean release() throws ElasticSearchException {
// nothing to release here...
return true;
}
}
static class Refresh { static class Refresh {
private final boolean waitForOperations; private final boolean waitForOperations;

View File

@ -25,6 +25,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.UnicodeUtil; import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter; import org.elasticsearch.common.bloom.BloomFilter;
@ -54,6 +55,7 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
@ -94,6 +96,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final IndexSettingsService indexSettingsService; private final IndexSettingsService indexSettingsService;
@Nullable
private final InternalIndicesWarmer warmer;
private final Store store; private final Store store;
private final SnapshotDeletionPolicy deletionPolicy; private final SnapshotDeletionPolicy deletionPolicy;
@ -152,7 +157,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
@Inject @Inject
public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService, IndexSettingsService indexSettingsService, @Nullable InternalIndicesWarmer warmer,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService, AnalysisService analysisService, SimilarityService similarityService,
@ -170,6 +175,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.threadPool = threadPool; this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService; this.indexSettingsService = indexSettingsService;
this.warmer = warmer;
this.store = store; this.store = store;
this.deletionPolicy = deletionPolicy; this.deletionPolicy = deletionPolicy;
this.translog = translog; this.translog = translog;
@ -713,7 +719,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
public Searcher searcher() throws EngineException { public Searcher searcher() throws EngineException {
SearcherManager manager = this.searcherManager; SearcherManager manager = this.searcherManager;
IndexSearcher searcher = manager.acquire(); IndexSearcher searcher = manager.acquire();
return new RobinSearchResult(searcher, manager); return new RobinSearcher(searcher, manager);
} }
@Override @Override
@ -1353,12 +1359,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
return new SearcherManager(indexWriter, true, searcherFactory); return new SearcherManager(indexWriter, true, searcherFactory);
} }
static class RobinSearchResult implements Searcher { static class RobinSearcher implements Searcher {
private final IndexSearcher searcher; private final IndexSearcher searcher;
private final SearcherManager manager; private final SearcherManager manager;
private RobinSearchResult(IndexSearcher searcher, SearcherManager manager) { private RobinSearcher(IndexSearcher searcher, SearcherManager manager) {
this.searcher = searcher; this.searcher = searcher;
this.manager = manager; this.manager = manager;
} }
@ -1420,6 +1426,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
public IndexSearcher newSearcher(IndexReader reader) throws IOException { public IndexSearcher newSearcher(IndexReader reader) throws IOException {
ExtendedIndexSearcher searcher = new ExtendedIndexSearcher(reader); ExtendedIndexSearcher searcher = new ExtendedIndexSearcher(reader);
searcher.setSimilarity(similarityService.defaultSearchSimilarity()); searcher.setSimilarity(similarityService.defaultSearchSimilarity());
if (warmer != null) {
warmer.warm(shardId, new SimpleSearcher(searcher));
}
return searcher; return searcher;
} }
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.jmx.JmxService; import org.elasticsearch.jmx.JmxService;
/** /**
@ -46,5 +47,6 @@ public class IndexShardModule extends AbstractModule {
if (JmxService.shouldExport(settings)) { if (JmxService.shouldExport(settings)) {
bind(IndexShardManagement.class).asEagerSingleton(); bind(IndexShardManagement.class).asEagerSingleton();
} }
bind(ShardIndexWarmerService.class).asEagerSingleton();
} }
} }

View File

@ -40,6 +40,8 @@ import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
/** /**
* *
@ -52,6 +54,8 @@ public interface IndexShard extends IndexShardComponent {
ShardSearchService searchService(); ShardSearchService searchService();
ShardIndexWarmerService warmerService();
ShardRouting routingEntry(); ShardRouting routingEntry();
DocsStats docStats(); DocsStats docStats();
@ -70,6 +74,8 @@ public interface IndexShard extends IndexShardComponent {
FlushStats flushStats(); FlushStats flushStats();
WarmerStats warmerStats();
IndexShardState state(); IndexShardState state();
Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException; Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException;

View File

@ -65,6 +65,8 @@ import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryStatus; import org.elasticsearch.indices.recovery.RecoveryStatus;
@ -111,6 +113,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final ShardGetService getService; private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
private final Object mutex = new Object(); private final Object mutex = new Object();
private final String checkIndexOnStartup; private final String checkIndexOnStartup;
@ -137,7 +141,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Inject @Inject
public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService) { ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService; this.indexSettingsService = indexSettingsService;
@ -153,6 +157,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.indexingService = indexingService; this.indexingService = indexingService;
this.getService = getService.setIndexShard(this); this.getService = getService.setIndexShard(this);
this.searchService = searchService; this.searchService = searchService;
this.shardWarmerService = shardWarmerService;
state = IndexShardState.CREATED; state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval())); this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval()));
@ -195,6 +200,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this.searchService; return this.searchService;
} }
@Override
public ShardIndexWarmerService warmerService() {
return this.shardWarmerService;
}
@Override @Override
public ShardRouting routingEntry() { public ShardRouting routingEntry() {
return this.shardRouting; return this.shardRouting;
@ -495,6 +505,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return mergeScheduler.stats(); return mergeScheduler.stats();
} }
@Override
public WarmerStats warmerStats() {
return shardWarmerService.stats();
}
@Override @Override
public void flush(Engine.Flush flush) throws ElasticSearchException { public void flush(Engine.Flush flush) throws ElasticSearchException {
// we allows flush while recovering, since we allow for operations to happen // we allows flush while recovering, since we allow for operations to happen

View File

@ -0,0 +1,57 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.warmer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.concurrent.TimeUnit;
/**
*/
public class ShardIndexWarmerService extends AbstractIndexShardComponent {
private final CounterMetric current = new CounterMetric();
private final MeanMetric warmerMetric = new MeanMetric();
@Inject
public ShardIndexWarmerService(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
public void onPreWarm() {
current.inc();
}
public void onPostWarm(long tookInNanos) {
current.dec();
warmerMetric.inc(tookInNanos);
}
public WarmerStats stats() {
return new WarmerStats(current.count(), warmerMetric.count(), TimeUnit.NANOSECONDS.toMillis(warmerMetric.sum()));
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.warmer;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
public class WarmerStats implements Streamable, ToXContent {
private long current;
private long total;
private long totalTimeInMillis;
public WarmerStats() {
}
public WarmerStats(long current, long total, long totalTimeInMillis) {
this.current = current;
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
}
public void add(long current, long total, long totalTimeInMillis) {
this.current += current;
this.total += total;
this.totalTimeInMillis += totalTimeInMillis;
}
public void add(WarmerStats warmerStats) {
if (warmerStats == null) {
return;
}
this.current += warmerStats.current;
this.total += warmerStats.total;
this.totalTimeInMillis += warmerStats.totalTimeInMillis;
}
public long current() {
return this.current;
}
/**
* The total number of warmer executed.
*/
public long total() {
return this.total;
}
/**
* The total time warmer have been executed (in milliseconds).
*/
public long totalTimeInMillis() {
return this.totalTimeInMillis;
}
/**
* The total time warmer have been executed.
*/
public TimeValue totalTime() {
return new TimeValue(totalTimeInMillis);
}
public static WarmerStats readWarmerStats(StreamInput in) throws IOException {
WarmerStats refreshStats = new WarmerStats();
refreshStats.readFrom(in);
return refreshStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.WARMER);
builder.field(Fields.CURRENT, current);
builder.field(Fields.TOTAL, total);
builder.field(Fields.TOTAL_TIME, totalTime().toString());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString WARMER = new XContentBuilderString("warmer");
static final XContentBuilderString CURRENT = new XContentBuilderString("current");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
}
@Override
public void readFrom(StreamInput in) throws IOException {
current = in.readVLong();
total = in.readVLong();
totalTimeInMillis = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(current);
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
}
}

View File

@ -34,6 +34,8 @@ import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
/** /**
* *
@ -66,5 +68,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesFilterCache.class).asEagerSingleton(); bind(IndicesFilterCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
} }
} }

View File

@ -0,0 +1,37 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.warmer;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
/**
*/
public interface IndicesWarmer {
static interface Listener {
void warm(ShardId shardId, IndexMetaData indexMetaData, Engine.Searcher search);
}
void addListener(Listener listener);
void removeListener(Listener listener);
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.warmer;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
/**
*/
public class InternalIndicesWarmer extends AbstractComponent implements IndicesWarmer {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject
public InternalIndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
}
@Override
public void addListener(Listener listener) {
listeners.add(listener);
}
@Override
public void removeListener(Listener listener) {
listeners.remove(listener);
}
public void warm(ShardId shardId, Engine.Searcher searcher) {
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name());
if (indexMetaData == null) {
return;
}
if (!indexMetaData.settings().getAsBoolean("index.warm.enabled", true)) {
return;
}
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService == null) {
return;
}
IndexShard indexShard = indexService.shard(shardId.id());
if (indexShard == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] warming [{}]", shardId.index().name(), shardId.id(), searcher.reader());
}
indexShard.warmerService().onPreWarm();
long time = System.nanoTime();
for (Listener listener : listeners) {
try {
listener.warm(shardId, indexMetaData, searcher);
} catch (Exception e) {
logger.warn("[{}][{}] failed to warm [{}]", shardId.index().name(), shardId.id(), listener);
}
}
long took = System.nanoTime() - time;
indexShard.warmerService().onPostWarm(took);
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] warming took [{}]", shardId.index().name(), shardId.id(), new TimeValue(took, TimeUnit.NANOSECONDS));
}
}
}

View File

@ -73,6 +73,9 @@ public class RestIndicesStatsAction extends BaseRestHandler {
controller.registerHandler(GET, "/_stats/flush", new RestFlushStatsHandler()); controller.registerHandler(GET, "/_stats/flush", new RestFlushStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/flush", new RestFlushStatsHandler()); controller.registerHandler(GET, "/{index}/_stats/flush", new RestFlushStatsHandler());
controller.registerHandler(GET, "/_stats/warmer", new RestWarmerStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/warmer", new RestWarmerStatsHandler());
} }
@Override @Override
@ -99,6 +102,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
indicesStatsRequest.merge(request.paramAsBoolean("merge", indicesStatsRequest.merge())); indicesStatsRequest.merge(request.paramAsBoolean("merge", indicesStatsRequest.merge()));
indicesStatsRequest.refresh(request.paramAsBoolean("refresh", indicesStatsRequest.refresh())); indicesStatsRequest.refresh(request.paramAsBoolean("refresh", indicesStatsRequest.refresh()));
indicesStatsRequest.flush(request.paramAsBoolean("flush", indicesStatsRequest.flush())); indicesStatsRequest.flush(request.paramAsBoolean("flush", indicesStatsRequest.flush()));
indicesStatsRequest.warmer(request.paramAsBoolean("warmer", indicesStatsRequest.warmer()));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStats>() { client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStats>() {
@Override @Override
@ -397,6 +401,43 @@ public class RestIndicesStatsAction extends BaseRestHandler {
} }
} }
class RestWarmerStatsHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear().warmer(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStats>() {
@Override
public void onResponse(IndicesStats response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
buildBroadcastShardsHeader(builder, response);
response.toXContent(builder, request);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}
class RestRefreshStatsHandler implements RestHandler { class RestRefreshStatsHandler implements RestHandler {
@Override @Override

View File

@ -21,13 +21,13 @@ package org.elasticsearch.test.unit.index.engine.robin;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bloom.none.NoneBloomCache; import org.elasticsearch.index.cache.bloom.none.NoneBloomCache;
import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.robin.RobinEngine; import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
@ -38,7 +38,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests { public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
protected Engine createEngine(Store store, Translog translog) { protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
} }
} }