[Monitoring] Align indices/index stats with local cluster state (elastic/x-pack-elasticsearch#4079)

A small bug in the `IndexStatsCollector` can potentially returns
statistics for newly created indices that does not exist yet in the
collector's `ClusterState` local instance.

It happens because an instance of the current `ClusterState` is
captured and passed to all the collectors before they are executed (so
that they all share the same view of the state of the cluster). On
some clusters, if an index is created after the `ClusterState` is
captured but before the `IndicesStatsRequest` is executed then it can
appears in the index stats but have no corresponding entry in the
local cluster state.

This commit changes the IndexStatsCollector so that it only return
statistics for indices that already exist in the cluster state. This
way a consistent view is possible between indices/index/shard stats.

Original commit: elastic/x-pack-elasticsearch@da173ae0b0
This commit is contained in:
Tanguy Leroux 2018-03-12 10:32:54 +01:00 committed by GitHub
parent 41af46688a
commit 15ab4af157
5 changed files with 120 additions and 75 deletions

View File

@ -1,21 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import java.util.List;
public class IndicesStatsResponseTestUtils {
/**
* Gives access to package private IndicesStatsResponse constructor for test purpose.
**/
public static IndicesStatsResponse newIndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards,
List<DefaultShardOperationFailedException> shardFailures) {
return new IndicesStatsResponse(shards, totalShards, successfulShards, failedShards, shardFailures);
}
}

View File

@ -10,8 +10,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -58,7 +58,7 @@ public class IndexStatsCollector extends Collector {
final long interval, final long interval,
final ClusterState clusterState) throws Exception { final ClusterState clusterState) throws Exception {
final List<MonitoringDoc> results = new ArrayList<>(); final List<MonitoringDoc> results = new ArrayList<>();
final IndicesStatsResponse indicesStats = client.admin().indices().prepareStats() final IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats()
.setIndices(getCollectionIndices()) .setIndices(getCollectionIndices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setIndicesOptions(IndicesOptions.lenientExpandOpen())
.clear() .clear()
@ -76,18 +76,23 @@ public class IndexStatsCollector extends Collector {
final long timestamp = timestamp(); final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState); final String clusterUuid = clusterUuid(clusterState);
final MetaData metadata = clusterState.metaData();
final RoutingTable routingTable = clusterState.routingTable();
// add the indices stats that we use to collect the index stats // Filters the indices stats to only return the statistics for the indices known by the collector's
results.add(new IndicesStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indicesStats)); // local cluster state. This way indices/index/shards stats all share a common view of indices state.
final List<IndexStats> indicesStats = new ArrayList<>();
for (final String indexName : metadata.getConcreteAllIndices()) {
final IndexStats indexStats = indicesStatsResponse.getIndex(indexName);
if (indexStats != null) {
// The index appears both in the local cluster state and indices stats response
indicesStats.add(indexStats);
// collect each index stats document results.add(new IndexStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indexStats,
for (final IndexStats indexStats : indicesStats.getIndices().values()) { metadata.index(indexName), routingTable.index(indexName)));
final String index = indexStats.getIndex(); }
final IndexMetaData metaData = clusterState.metaData().index(index);
final IndexRoutingTable routingTable = clusterState.routingTable().index(index);
results.add(new IndexStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indexStats, metaData, routingTable));
} }
results.add(new IndicesStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indicesStats));
return Collections.unmodifiableCollection(results); return Collections.unmodifiableCollection(results);
} }

View File

@ -5,7 +5,9 @@
*/ */
package org.elasticsearch.xpack.monitoring.collector.indices; package org.elasticsearch.xpack.monitoring.collector.indices;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
@ -13,6 +15,7 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.FilteredMonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.FilteredMonitoringDoc;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -23,25 +26,68 @@ public class IndicesStatsMonitoringDoc extends FilteredMonitoringDoc {
public static final String TYPE = "indices_stats"; public static final String TYPE = "indices_stats";
private final IndicesStatsResponse indicesStats; private final List<IndexStats> indicesStats;
IndicesStatsMonitoringDoc(final String cluster, IndicesStatsMonitoringDoc(final String cluster,
final long timestamp, final long timestamp,
final long intervalMillis, final long intervalMillis,
final MonitoringDoc.Node node, final MonitoringDoc.Node node,
final IndicesStatsResponse indicesStats) { final List<IndexStats> indicesStats) {
super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null, XCONTENT_FILTERS); super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null, XCONTENT_FILTERS);
this.indicesStats = Objects.requireNonNull(indicesStats); this.indicesStats = Objects.requireNonNull(indicesStats);
} }
IndicesStatsResponse getIndicesStats() { List<IndexStats> getIndicesStats() {
return indicesStats; return indicesStats;
} }
@Override @Override
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
final CommonStats total = new CommonStats();
final CommonStats primaries = new CommonStats();
for (IndexStats indexStats : getIndicesStats()) {
final ShardStats[] shardsStats = indexStats.getShards();
if (shardsStats != null) {
for (ShardStats shard : indexStats.getShards()) {
total.add(shard.getStats());
if (shard.getShardRouting().primary()) {
primaries.add(shard.getStats());
}
}
}
}
builder.startObject(TYPE); builder.startObject(TYPE);
indicesStats.toXContent(builder, params); {
builder.startObject("_all");
{
builder.startObject("primaries");
primaries.toXContent(builder, params);
builder.endObject();
builder.startObject("total");
total.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
builder.startObject("indices");
for (IndexStats indexStats : getIndicesStats()) {
builder.startObject(indexStats.getIndex());
builder.startObject("primaries");
indexStats.getPrimaries().toXContent(builder, params);
builder.endObject();
builder.startObject("total");
indexStats.getTotal().toXContent(builder, params);
builder.endObject();
builder.endObject();
}
builder.endObject();
}
builder.endObject(); builder.endObject();
} }

View File

@ -23,7 +23,9 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
import java.util.Collection; import java.util.Collection;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -32,6 +34,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -86,9 +89,18 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
final RoutingTable routingTable = mock(RoutingTable.class); final RoutingTable routingTable = mock(RoutingTable.class);
when(clusterState.routingTable()).thenReturn(routingTable); when(clusterState.routingTable()).thenReturn(routingTable);
final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
final MonitoringDoc.Node node = randomMonitoringNode(random()); final MonitoringDoc.Node node = randomMonitoringNode(random());
final int indices = randomIntBetween(0, 10); // Number of indices that exist in the cluster state and returned in the IndicesStatsResponse
final int existingIndices = randomIntBetween(0, 10);
// Number of indices returned in the IndicesStatsResponse only
final int createdIndices = randomIntBetween(0, 10);
// Number of indices returned in the local cluster state only
final int deletedIndices = randomIntBetween(0, 10);
// Total number of indices
final int indices = existingIndices + createdIndices + deletedIndices;
final Map<String, IndexStats> indicesStats = new HashMap<>(indices); final Map<String, IndexStats> indicesStats = new HashMap<>(indices);
final Map<String, IndexMetaData> indicesMetaData = new HashMap<>(indices); final Map<String, IndexMetaData> indicesMetaData = new HashMap<>(indices);
final Map<String, IndexRoutingTable> indicesRoutingTable = new HashMap<>(indices); final Map<String, IndexRoutingTable> indicesRoutingTable = new HashMap<>(indices);
@ -96,20 +108,29 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
for (int i = 0; i < indices; i++) { for (int i = 0; i < indices; i++) {
final String index = "_index_" + i; final String index = "_index_" + i;
final IndexStats indexStats = mock(IndexStats.class); final IndexStats indexStats = mock(IndexStats.class);
when(indexStats.getIndex()).thenReturn(index);
final IndexMetaData indexMetaData = mock(IndexMetaData.class); final IndexMetaData indexMetaData = mock(IndexMetaData.class);
final IndexRoutingTable indexRoutingTable = mock(IndexRoutingTable.class); final IndexRoutingTable indexRoutingTable = mock(IndexRoutingTable.class);
indicesStats.put(index, indexStats); if (i < (createdIndices + existingIndices)) {
indicesMetaData.put(index, indexMetaData); when(indicesStatsResponse.getIndex(index)).thenReturn(indexStats);
indicesRoutingTable.put(index, indexRoutingTable); }
if (i >= createdIndices) {
indicesMetaData.put(index, indexMetaData);
when(metaData.index(index)).thenReturn(indexMetaData);
when(indexStats.getIndex()).thenReturn(index); indicesRoutingTable.put(index, indexRoutingTable);
when(metaData.index(index)).thenReturn(indexMetaData); when(routingTable.index(index)).thenReturn(indexRoutingTable);
when(routingTable.index(index)).thenReturn(indexRoutingTable);
if (i < (createdIndices + existingIndices)) {
indicesStats.put(index, indexStats);
}
}
} }
final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); final String[] indexNames = indicesMetaData.keySet().toArray(new String[0]);
when(indicesStatsResponse.getIndices()).thenReturn(indicesStats); when(metaData.getConcreteAllIndices()).thenReturn(indexNames);
final IndicesStatsRequestBuilder indicesStatsRequestBuilder = final IndicesStatsRequestBuilder indicesStatsRequestBuilder =
spy(new IndicesStatsRequestBuilder(mock(ElasticsearchClient.class), IndicesStatsAction.INSTANCE)); spy(new IndicesStatsRequestBuilder(mock(ElasticsearchClient.class), IndicesStatsAction.INSTANCE));
@ -131,11 +152,13 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState); final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
verify(indicesAdminClient).prepareStats(); verify(indicesAdminClient).prepareStats();
verify(clusterState, times(1 + indices)).metaData();
verify(clusterState, times(indices)).routingTable(); verify(indicesStatsResponse, times(existingIndices + deletedIndices)).getIndex(anyString());
verify(metaData, times(existingIndices)).index(anyString());
verify(routingTable, times(existingIndices)).index(anyString());
verify(metaData).clusterUUID(); verify(metaData).clusterUUID();
assertEquals(1 + indices, results.size()); assertEquals(1 + existingIndices, results.size());
for (final MonitoringDoc document : results) { for (final MonitoringDoc document : results) {
assertThat(document.getCluster(), equalTo(clusterUUID)); assertThat(document.getCluster(), equalTo(clusterUUID));
@ -147,7 +170,9 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
if (document instanceof IndicesStatsMonitoringDoc) { if (document instanceof IndicesStatsMonitoringDoc) {
assertThat(document.getType(), equalTo(IndicesStatsMonitoringDoc.TYPE)); assertThat(document.getType(), equalTo(IndicesStatsMonitoringDoc.TYPE));
assertThat(((IndicesStatsMonitoringDoc) document).getIndicesStats(), is(indicesStatsResponse)); final List<IndexStats> actualIndicesStats = ((IndicesStatsMonitoringDoc) document).getIndicesStats();
actualIndicesStats.forEach((value) -> assertThat(value, is(indicesStats.get(value.getIndex()))));
assertThat(actualIndicesStats.size(), equalTo(indicesStats.size()));
} else { } else {
assertThat(document.getType(), equalTo(IndexStatsMonitoringDoc.TYPE)); assertThat(document.getType(), equalTo(IndexStatsMonitoringDoc.TYPE));

View File

@ -7,13 +7,11 @@ package org.elasticsearch.xpack.monitoring.collector.indices;
import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.SearchStats;
@ -29,35 +27,37 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Set; import java.util.Set;
import static java.util.Collections.emptyList;
import static org.elasticsearch.action.admin.indices.stats.IndicesStatsResponseTestUtils.newIndicesStatsResponse;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCase<IndicesStatsMonitoringDoc> { public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCase<IndicesStatsMonitoringDoc> {
private IndicesStatsResponse indicesStatsResponse; private List<IndexStats> indicesStats;
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
indicesStatsResponse = mock(IndicesStatsResponse.class); indicesStats = Collections.singletonList(new IndexStats("index-0", new ShardStats[] {
when(indicesStatsResponse.toXContent(any(XContentBuilder.class), any(ToXContent.Params.class))).thenCallRealMethod(); // Primaries
when(indicesStatsResponse.getPrimaries()).thenReturn(mockCommonStats()); new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null),
when(indicesStatsResponse.getTotal()).thenReturn(mockCommonStats()); new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null),
// Replica
new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null)
}));
} }
@Override @Override
protected IndicesStatsMonitoringDoc createMonitoringDoc(String cluster, long timestamp, long interval, MonitoringDoc.Node node, protected IndicesStatsMonitoringDoc createMonitoringDoc(String cluster, long timestamp, long interval, MonitoringDoc.Node node,
MonitoredSystem system, String type, String id) { MonitoredSystem system, String type, String id) {
return new IndicesStatsMonitoringDoc(cluster, timestamp, interval, node, indicesStatsResponse); return new IndicesStatsMonitoringDoc(cluster, timestamp, interval, node, indicesStats);
} }
@Override @Override
@ -66,7 +66,7 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes
assertThat(document.getType(), is(IndicesStatsMonitoringDoc.TYPE)); assertThat(document.getType(), is(IndicesStatsMonitoringDoc.TYPE));
assertThat(document.getId(), nullValue()); assertThat(document.getId(), nullValue());
assertThat(document.getIndicesStats(), is(indicesStatsResponse)); assertThat(document.getIndicesStats(), is(indicesStats));
} }
@Override @Override
@ -81,18 +81,8 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes
@Override @Override
public void testToXContent() throws IOException { public void testToXContent() throws IOException {
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);
final ShardStats[] shards = new ShardStats[] {
// Primaries
new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null),
new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null),
// Replica
new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null)
};
final IndicesStatsResponse indicesStatsResponse = newIndicesStatsResponse(shards, -1, -1, -1, emptyList());
final IndicesStatsMonitoringDoc document = final IndicesStatsMonitoringDoc document =
new IndicesStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, indicesStatsResponse); new IndicesStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, indicesStats);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false); final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertEquals("{" assertEquals("{"