Node Stats: Certain indices level stats to retain stats even when shard relocates, closes #1276.
This commit is contained in:
parent
ee585ad96c
commit
d48542a979
|
@ -152,11 +152,15 @@ public class IndexingStats implements Streamable, ToXContent {
|
|||
}
|
||||
|
||||
public void add(IndexingStats indexingStats) {
|
||||
add(indexingStats, true);
|
||||
}
|
||||
|
||||
public void add(IndexingStats indexingStats, boolean includeTypes) {
|
||||
if (indexingStats == null) {
|
||||
return;
|
||||
}
|
||||
totalStats.add(indexingStats.totalStats);
|
||||
if (indexingStats.typeStats != null && !indexingStats.typeStats.isEmpty()) {
|
||||
if (includeTypes && indexingStats.typeStats != null && !indexingStats.typeStats.isEmpty()) {
|
||||
if (typeStats == null) {
|
||||
typeStats = new HashMap<String, Stats>(indexingStats.typeStats.size());
|
||||
}
|
||||
|
|
|
@ -38,7 +38,12 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
|
|||
*/
|
||||
public boolean changesAllowed();
|
||||
|
||||
NodeIndicesStats stats();
|
||||
/**
|
||||
* Returns the node stats indices stats. The <tt>includePrevious</tt> flag controls
|
||||
* if old shards stats will be aggregated as well (only for relevant stats, such as
|
||||
* refresh and indexing, not for docs/store).
|
||||
*/
|
||||
NodeIndicesStats stats(boolean includePrevious);
|
||||
|
||||
boolean hasIndex(String index);
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.elasticsearch.index.service.IndexService;
|
|||
import org.elasticsearch.index.service.InternalIndexService;
|
||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||
import org.elasticsearch.index.shard.DocsStats;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||
import org.elasticsearch.index.store.IndexStoreModule;
|
||||
|
@ -115,6 +116,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
|
||||
private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of();
|
||||
|
||||
private final OldShardsStats oldShardsStats = new OldShardsStats();
|
||||
|
||||
@Inject public InternalIndicesService(Settings settings, NodeEnvironment nodeEnv, ThreadPool threadPool, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
|
||||
super(settings);
|
||||
this.nodeEnv = nodeEnv;
|
||||
|
@ -125,6 +128,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
this.injector = injector;
|
||||
|
||||
this.pluginsService = injector.getInstance(PluginsService.class);
|
||||
|
||||
this.indicesLifecycle.addListener(oldShardsStats);
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
|
@ -169,7 +174,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
return this.indicesLifecycle;
|
||||
}
|
||||
|
||||
@Override public NodeIndicesStats stats() {
|
||||
@Override public NodeIndicesStats stats(boolean includePrevious) {
|
||||
DocsStats docsStats = new DocsStats();
|
||||
StoreStats storeStats = new StoreStats();
|
||||
IndexingStats indexingStats = new IndexingStats();
|
||||
|
@ -177,6 +182,14 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
MergeStats mergeStats = new MergeStats();
|
||||
RefreshStats refreshStats = new RefreshStats();
|
||||
FlushStats flushStats = new FlushStats();
|
||||
|
||||
if (includePrevious) {
|
||||
indexingStats.add(oldShardsStats.indexingStats);
|
||||
mergeStats.add(oldShardsStats.mergeStats);
|
||||
refreshStats.add(oldShardsStats.refreshStats);
|
||||
flushStats.add(oldShardsStats.flushStats);
|
||||
}
|
||||
|
||||
for (IndexService indexService : indices.values()) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
storeStats.add(indexShard.storeStats());
|
||||
|
@ -332,4 +345,21 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
FileSystemUtils.deleteRecursively(nodeEnv.indexLocation(new Index(index)));
|
||||
}
|
||||
}
|
||||
|
||||
static class OldShardsStats extends IndicesLifecycle.Listener {
|
||||
|
||||
final IndexingStats indexingStats = new IndexingStats();
|
||||
final MergeStats mergeStats = new MergeStats();
|
||||
final RefreshStats refreshStats = new RefreshStats();
|
||||
final FlushStats flushStats = new FlushStats();
|
||||
|
||||
@Override public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
|
||||
if (indexShard != null) {
|
||||
indexingStats.add(indexShard.indexingStats(), false);
|
||||
mergeStats.add(indexShard.mergeStats());
|
||||
refreshStats.add(indexShard.refreshStats());
|
||||
flushStats.add(indexShard.flushStats());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -77,7 +77,9 @@ public class NodeService extends AbstractComponent {
|
|||
}
|
||||
|
||||
public NodeStats stats() {
|
||||
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(),
|
||||
// for indices stats we want to include previous allocated shards stats as well (it will
|
||||
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
|
||||
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(true),
|
||||
monitorService.osService().stats(), monitorService.processService().stats(),
|
||||
monitorService.jvmService().stats(), monitorService.networkService().stats(),
|
||||
transportService.stats(), httpServer == null ? null : httpServer.stats());
|
||||
|
|
Loading…
Reference in New Issue