From 15ab4af157cddbd7bcd35e6ddfd4442ed2ce8975 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 12 Mar 2018 10:32:54 +0100 Subject: [PATCH] [Monitoring] Align indices/index stats with local cluster state (elastic/x-pack-elasticsearch#4079) A small bug in the `IndexStatsCollector` can potentially returns statistics for newly created indices that does not exist yet in the collector's `ClusterState` local instance. It happens because an instance of the current `ClusterState` is captured and passed to all the collectors before they are executed (so that they all share the same view of the state of the cluster). On some clusters, if an index is created after the `ClusterState` is captured but before the `IndicesStatsRequest` is executed then it can appears in the index stats but have no corresponding entry in the local cluster state. This commit changes the IndexStatsCollector so that it only return statistics for indices that already exist in the cluster state. This way a consistent view is possible between indices/index/shard stats. Original commit: elastic/x-pack-elasticsearch@da173ae0b0985a44fe09c810c864f07da192624d --- .../stats/IndicesStatsResponseTestUtils.java | 21 ------- .../indices/IndexStatsCollector.java | 29 ++++++---- .../indices/IndicesStatsMonitoringDoc.java | 56 +++++++++++++++++-- .../indices/IndexStatsCollectorTests.java | 51 ++++++++++++----- .../IndicesStatsMonitoringDocTests.java | 38 +++++-------- 5 files changed, 120 insertions(+), 75 deletions(-) delete mode 100644 plugin/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTestUtils.java diff --git a/plugin/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTestUtils.java b/plugin/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTestUtils.java deleted file mode 100644 index 18f4028b6bb..00000000000 --- a/plugin/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTestUtils.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.action.admin.indices.stats; - -import org.elasticsearch.action.support.DefaultShardOperationFailedException; - -import java.util.List; - -public class IndicesStatsResponseTestUtils { - - /** - * Gives access to package private IndicesStatsResponse constructor for test purpose. - **/ - public static IndicesStatsResponse newIndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards, - List shardFailures) { - return new IndicesStatsResponse(shards, totalShards, successfulShards, failedShards, shardFailures); - } -} diff --git a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java index f21f0697a18..b6f50ed9e5c 100644 --- a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java +++ b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java @@ -10,8 +10,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -58,7 +58,7 @@ public class IndexStatsCollector extends Collector { final long interval, final ClusterState clusterState) throws Exception { final List results = new ArrayList<>(); - final IndicesStatsResponse indicesStats = client.admin().indices().prepareStats() + final IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats() .setIndices(getCollectionIndices()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .clear() @@ -76,18 +76,23 @@ public class IndexStatsCollector extends Collector { final long timestamp = timestamp(); final String clusterUuid = clusterUuid(clusterState); + final MetaData metadata = clusterState.metaData(); + final RoutingTable routingTable = clusterState.routingTable(); - // add the indices stats that we use to collect the index stats - results.add(new IndicesStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indicesStats)); + // Filters the indices stats to only return the statistics for the indices known by the collector's + // local cluster state. This way indices/index/shards stats all share a common view of indices state. + final List indicesStats = new ArrayList<>(); + for (final String indexName : metadata.getConcreteAllIndices()) { + final IndexStats indexStats = indicesStatsResponse.getIndex(indexName); + if (indexStats != null) { + // The index appears both in the local cluster state and indices stats response + indicesStats.add(indexStats); - // collect each index stats document - for (final IndexStats indexStats : indicesStats.getIndices().values()) { - final String index = indexStats.getIndex(); - final IndexMetaData metaData = clusterState.metaData().index(index); - final IndexRoutingTable routingTable = clusterState.routingTable().index(index); - - results.add(new IndexStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indexStats, metaData, routingTable)); + results.add(new IndexStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indexStats, + metadata.index(indexName), routingTable.index(indexName))); + } } + results.add(new IndicesStatsMonitoringDoc(clusterUuid, timestamp, interval, node, indicesStats)); return Collections.unmodifiableCollection(results); } diff --git a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java index b58fecc9740..6fbf26b7c39 100644 --- a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java +++ b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.monitoring.collector.indices; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; @@ -13,6 +15,7 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.FilteredMonitoringDoc; import java.io.IOException; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -23,25 +26,68 @@ public class IndicesStatsMonitoringDoc extends FilteredMonitoringDoc { public static final String TYPE = "indices_stats"; - private final IndicesStatsResponse indicesStats; + private final List indicesStats; IndicesStatsMonitoringDoc(final String cluster, final long timestamp, final long intervalMillis, final MonitoringDoc.Node node, - final IndicesStatsResponse indicesStats) { + final List indicesStats) { super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null, XCONTENT_FILTERS); this.indicesStats = Objects.requireNonNull(indicesStats); } - IndicesStatsResponse getIndicesStats() { + List getIndicesStats() { return indicesStats; } @Override protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { + final CommonStats total = new CommonStats(); + final CommonStats primaries = new CommonStats(); + + for (IndexStats indexStats : getIndicesStats()) { + final ShardStats[] shardsStats = indexStats.getShards(); + if (shardsStats != null) { + for (ShardStats shard : indexStats.getShards()) { + total.add(shard.getStats()); + if (shard.getShardRouting().primary()) { + primaries.add(shard.getStats()); + } + } + } + } + builder.startObject(TYPE); - indicesStats.toXContent(builder, params); + { + builder.startObject("_all"); + { + builder.startObject("primaries"); + primaries.toXContent(builder, params); + builder.endObject(); + + builder.startObject("total"); + total.toXContent(builder, params); + builder.endObject(); + } + builder.endObject(); + + builder.startObject("indices"); + for (IndexStats indexStats : getIndicesStats()) { + builder.startObject(indexStats.getIndex()); + + builder.startObject("primaries"); + indexStats.getPrimaries().toXContent(builder, params); + builder.endObject(); + + builder.startObject("total"); + indexStats.getTotal().toXContent(builder, params); + builder.endObject(); + + builder.endObject(); + } + builder.endObject(); + } builder.endObject(); } diff --git a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java index 39c97e761bc..44db39d8573 100644 --- a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java +++ b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java @@ -23,7 +23,9 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; import java.util.Collection; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -32,6 +34,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -86,9 +89,18 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase { final RoutingTable routingTable = mock(RoutingTable.class); when(clusterState.routingTable()).thenReturn(routingTable); + final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); final MonitoringDoc.Node node = randomMonitoringNode(random()); - final int indices = randomIntBetween(0, 10); + // Number of indices that exist in the cluster state and returned in the IndicesStatsResponse + final int existingIndices = randomIntBetween(0, 10); + // Number of indices returned in the IndicesStatsResponse only + final int createdIndices = randomIntBetween(0, 10); + // Number of indices returned in the local cluster state only + final int deletedIndices = randomIntBetween(0, 10); + // Total number of indices + final int indices = existingIndices + createdIndices + deletedIndices; + final Map indicesStats = new HashMap<>(indices); final Map indicesMetaData = new HashMap<>(indices); final Map indicesRoutingTable = new HashMap<>(indices); @@ -96,20 +108,29 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase { for (int i = 0; i < indices; i++) { final String index = "_index_" + i; final IndexStats indexStats = mock(IndexStats.class); + when(indexStats.getIndex()).thenReturn(index); + final IndexMetaData indexMetaData = mock(IndexMetaData.class); final IndexRoutingTable indexRoutingTable = mock(IndexRoutingTable.class); - indicesStats.put(index, indexStats); - indicesMetaData.put(index, indexMetaData); - indicesRoutingTable.put(index, indexRoutingTable); + if (i < (createdIndices + existingIndices)) { + when(indicesStatsResponse.getIndex(index)).thenReturn(indexStats); + } + if (i >= createdIndices) { + indicesMetaData.put(index, indexMetaData); + when(metaData.index(index)).thenReturn(indexMetaData); - when(indexStats.getIndex()).thenReturn(index); - when(metaData.index(index)).thenReturn(indexMetaData); - when(routingTable.index(index)).thenReturn(indexRoutingTable); + indicesRoutingTable.put(index, indexRoutingTable); + when(routingTable.index(index)).thenReturn(indexRoutingTable); + + if (i < (createdIndices + existingIndices)) { + indicesStats.put(index, indexStats); + } + } } - final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); - when(indicesStatsResponse.getIndices()).thenReturn(indicesStats); + final String[] indexNames = indicesMetaData.keySet().toArray(new String[0]); + when(metaData.getConcreteAllIndices()).thenReturn(indexNames); final IndicesStatsRequestBuilder indicesStatsRequestBuilder = spy(new IndicesStatsRequestBuilder(mock(ElasticsearchClient.class), IndicesStatsAction.INSTANCE)); @@ -131,11 +152,13 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase { final Collection results = collector.doCollect(node, interval, clusterState); verify(indicesAdminClient).prepareStats(); - verify(clusterState, times(1 + indices)).metaData(); - verify(clusterState, times(indices)).routingTable(); + + verify(indicesStatsResponse, times(existingIndices + deletedIndices)).getIndex(anyString()); + verify(metaData, times(existingIndices)).index(anyString()); + verify(routingTable, times(existingIndices)).index(anyString()); verify(metaData).clusterUUID(); - assertEquals(1 + indices, results.size()); + assertEquals(1 + existingIndices, results.size()); for (final MonitoringDoc document : results) { assertThat(document.getCluster(), equalTo(clusterUUID)); @@ -147,7 +170,9 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase { if (document instanceof IndicesStatsMonitoringDoc) { assertThat(document.getType(), equalTo(IndicesStatsMonitoringDoc.TYPE)); - assertThat(((IndicesStatsMonitoringDoc) document).getIndicesStats(), is(indicesStatsResponse)); + final List actualIndicesStats = ((IndicesStatsMonitoringDoc) document).getIndicesStats(); + actualIndicesStats.forEach((value) -> assertThat(value, is(indicesStats.get(value.getIndex())))); + assertThat(actualIndicesStats.size(), equalTo(indicesStats.size())); } else { assertThat(document.getType(), equalTo(IndexStatsMonitoringDoc.TYPE)); diff --git a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index 896bce5a2d9..86fa89f4c01 100644 --- a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -7,13 +7,11 @@ package org.elasticsearch.xpack.monitoring.collector.indices; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.search.stats.SearchStats; @@ -29,35 +27,37 @@ import org.junit.Before; import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; +import java.util.List; import java.util.Set; -import static java.util.Collections.emptyList; -import static org.elasticsearch.action.admin.indices.stats.IndicesStatsResponseTestUtils.newIndicesStatsResponse; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCase { - private IndicesStatsResponse indicesStatsResponse; + private List indicesStats; @Override @Before public void setUp() throws Exception { super.setUp(); - indicesStatsResponse = mock(IndicesStatsResponse.class); - when(indicesStatsResponse.toXContent(any(XContentBuilder.class), any(ToXContent.Params.class))).thenCallRealMethod(); - when(indicesStatsResponse.getPrimaries()).thenReturn(mockCommonStats()); - when(indicesStatsResponse.getTotal()).thenReturn(mockCommonStats()); + indicesStats = Collections.singletonList(new IndexStats("index-0", new ShardStats[] { + // Primaries + new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), + new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), + // Replica + new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null) + })); } @Override protected IndicesStatsMonitoringDoc createMonitoringDoc(String cluster, long timestamp, long interval, MonitoringDoc.Node node, MonitoredSystem system, String type, String id) { - return new IndicesStatsMonitoringDoc(cluster, timestamp, interval, node, indicesStatsResponse); + return new IndicesStatsMonitoringDoc(cluster, timestamp, interval, node, indicesStats); } @Override @@ -66,7 +66,7 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes assertThat(document.getType(), is(IndicesStatsMonitoringDoc.TYPE)); assertThat(document.getId(), nullValue()); - assertThat(document.getIndicesStats(), is(indicesStatsResponse)); + assertThat(document.getIndicesStats(), is(indicesStats)); } @Override @@ -81,18 +81,8 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes @Override public void testToXContent() throws IOException { final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L); - - final ShardStats[] shards = new ShardStats[] { - // Primaries - new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), - new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), - // Replica - new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null) - }; - final IndicesStatsResponse indicesStatsResponse = newIndicesStatsResponse(shards, -1, -1, -1, emptyList()); - final IndicesStatsMonitoringDoc document = - new IndicesStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, indicesStatsResponse); + new IndicesStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, indicesStats); final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false); assertEquals("{"