[7.x] Add ingest info to Cluster Stats (#48485) (#48661)

* Add ingest info to Cluster Stats (#48485)

This commit enhances the ClusterStatsNodes response to include global
processor usage stats on a per-processor basis.

example output:

```
...
    "processor_stats": {
      "gsub": {
        "count": 0,
        "failed": 0
        "current": 0
        "time_in_millis": 0
      },
      "script": {
        "count": 0,
        "failed": 0
        "current": 0,
        "time_in_millis": 0
      }
    }
...
```

The purpose for this enhancement is to make it easier to collect stats on how specific processors are being used across the cluster beyond the current per-node usage statistics that currently exist in node stats.

Closes #46146.

* fix BWC of ingest stats

The introduction of processor types into IngestStats had a bug.
It was set to `null` and set as the key to the map. This would
throw a NPE. This commit resolves this by setting all the processor
types from previous versions that are not serializing it out to
`_NOT_AVAILABLE`.
This commit is contained in:
Tal Levy 2019-10-31 14:36:54 -07:00 committed by GitHub
parent d0ead688c3
commit 4be54402de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 181 additions and 15 deletions

View File

@ -227,6 +227,12 @@ The API returns the following response:
},
...
],
"ingest": {
"number_of_pipelines" : 1,
"processor_stats": {
...
}
},
"network_types": {
...
},
@ -244,6 +250,7 @@ The API returns the following response:
// TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/]
// TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/]
// TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/]
// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/]
// TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/]
// TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/]
// TESTRESPONSE[s/: true|false/: $body.$_path/]

View File

@ -49,7 +49,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ClusterStatsNodes implements ToXContentFragment {
@ -64,6 +66,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final NetworkTypes networkTypes;
private final DiscoveryTypes discoveryTypes;
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;
ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
@ -97,6 +100,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
}
public Counts getCounts() {
@ -178,6 +182,9 @@ public class ClusterStatsNodes implements ToXContentFragment {
discoveryTypes.toXContent(builder, params);
packagingTypes.toXContent(builder, params);
ingestStats.toXContent(builder, params);
return builder;
}
@ -690,4 +697,68 @@ public class ClusterStatsNodes implements ToXContentFragment {
}
static class IngestStats implements ToXContentFragment {
final int pipelineCount;
final SortedMap<String, long[]> stats;
IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String,
List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat.getIngestStats()
.getProcessorStats().entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis()
};
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
return v;
}
});
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.stats = Collections.unmodifiableSortedMap(stats);
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject("ingest");
{
builder.field("number_of_pipelines", pipelineCount);
builder.startObject("processor_stats");
for (Map.Entry<String, long[]> stat : stats.entrySet()) {
long[] statValues = stat.getValue();
builder.startObject(stat.getKey());
builder.field("count", statValues[0]);
builder.field("failed", statValues[1]);
builder.field("current", statValues[2]);
builder.humanReadableField("time_in_millis", "time",
new TimeValue(statValues[3], TimeUnit.MILLISECONDS));
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
}

View File

@ -94,7 +94,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, false, false);
true, true, true, false, true, false, false, false, false, false, true, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {

View File

@ -413,7 +413,7 @@ public class IngestService implements ClusterStateApplier {
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
return statsBuilder.build();

View File

@ -69,8 +69,12 @@ public class IngestStats implements Writeable, ToXContentFragment {
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
String processorType = "_NOT_AVAILABLE";
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
processorType = in.readString();
}
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
@ -92,6 +96,9 @@ public class IngestStats implements Writeable, ToXContentFragment {
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeString(processorStat.getType());
}
processorStat.getStats().writeTo(out);
}
}
@ -115,9 +122,12 @@ public class IngestStats implements Writeable, ToXContentFragment {
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
}
}
builder.endArray();
@ -229,9 +239,9 @@ public class IngestStats implements Writeable, ToXContentFragment {
return this;
}
Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
.add(new ProcessorStat(processorName, processorType, metric.createStats()));
return this;
}
@ -267,10 +277,12 @@ public class IngestStats implements Writeable, ToXContentFragment {
*/
public static class ProcessorStat {
private final String name;
private final String type;
private final Stats stats;
public ProcessorStat(String name, Stats stats) {
public ProcessorStat(String name, String type, Stats stats) {
this.name = name;
this.type = type;
this.stats = stats;
}
@ -278,6 +290,10 @@ public class IngestStats implements Writeable, ToXContentFragment {
return name;
}
public String getType() {
return type;
}
public Stats getStats() {
return stats;
}

View File

@ -315,7 +315,7 @@ public class NodeStatsTests extends ESTestCase {
}
}
private static NodeStats createNodeStats() {
public static NodeStats createNodeStats() {
DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), VersionUtils.randomVersion(random()));
OsStats osStats = null;
@ -456,7 +456,8 @@ public class NodeStatsTests extends ESTestCase {
for (int j =0; j < numProcessors;j++) {
IngestStats.Stats processorStats = new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10),
randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.action.admin.cluster.stats;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
@ -27,11 +29,17 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.hamcrest.Matchers.equalTo;
public class ClusterStatsNodesTests extends ESTestCase {
@ -59,6 +67,41 @@ public class ClusterStatsNodesTests extends ESTestCase {
+ "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString());
}
public void testIngestStats() throws Exception {
NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats);
SortedMap<String, long[]> processorStats = new TreeMap<>();
nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(),
new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(),
s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()})));
ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats));
assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size()));
String processorStatsString = "{";
Iterator<Map.Entry<String, long[]>> iter = processorStats.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, long[]> entry = iter.next();
long[] statValues = entry.getValue();
long count = statValues[0];
long failedCount = statValues[1];
long current = statValues[2];
long timeInMillis = statValues[3];
processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count
+ ",\"failed\":" + failedCount
+ ",\"current\":" + current
+ ",\"time_in_millis\":" + timeInMillis
+ "}";
if (iter.hasNext()) {
processorStatsString += ",";
}
}
processorStatsString += "}";
assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo(
"{\"ingest\":{"
+ "\"number_of_pipelines\":" + stats.pipelineCount + ","
+ "\"processor_stats\":" + processorStatsString
+ "}}"));
}
private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
Settings.Builder settings = Settings.builder();
if (transportType != null) {

View File

@ -42,7 +42,7 @@ public class IngestStatsTests extends ESTestCase {
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats);
IngestStats serializedStats = serialize(ingestStats);
assertIngestStats(ingestStats, serializedStats, true);
assertIngestStats(ingestStats, serializedStats, true, true);
}
public void testReadLegacyStream() throws IOException {
@ -63,7 +63,24 @@ public class IngestStatsTests extends ESTestCase {
in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0));
IngestStats serializedStats = new IngestStats(in);
IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap());
assertIngestStats(expectedStats, serializedStats, false);
assertIngestStats(expectedStats, serializedStats, false, true);
}
public void testBWCIngestProcessorTypeStats() throws IOException {
IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300);
List<IngestStats.PipelineStat> pipelineStats = createPipelineStats();
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats expectedIngestStats = new IngestStats(totalStats, pipelineStats, processorStats);
//legacy output logic
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0));
expectedIngestStats.writeTo(out);
StreamInput in = out.bytes().streamInput();
in.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0));
IngestStats serializedStats = new IngestStats(in);
assertIngestStats(expectedIngestStats, serializedStats, true, false);
}
private List<IngestStats.PipelineStat> createPipelineStats() {
@ -75,9 +92,10 @@ public class IngestStatsTests extends ESTestCase {
private Map<String, List<IngestStats.ProcessorStat>> createProcessorStats(List<IngestStats.PipelineStat> pipelineStats){
assert(pipelineStats.size() >= 2);
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type",
new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
return MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
@ -92,7 +110,8 @@ public class IngestStatsTests extends ESTestCase {
return new IngestStats(in);
}
private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){
private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors,
boolean expectProcessorTypes){
assertNotSame(ingestStats, serializedStats);
assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats());
assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats());
@ -114,6 +133,11 @@ public class IngestStatsTests extends ESTestCase {
for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) {
IngestStats.ProcessorStat ps = it.next();
assertEquals(ps.getName(), serializedProcessorStat.getName());
if (expectProcessorTypes) {
assertEquals(ps.getType(), serializedProcessorStat.getType());
} else {
assertEquals("_NOT_AVAILABLE", serializedProcessorStat.getType());
}
assertStats(ps.getStats(), serializedProcessorStat.getStats());
}
assertFalse(it.hasNext());

View File

@ -535,7 +535,11 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
+ "\"type\":\"docker\","
+ "\"count\":1"
+ "}"
+ "]"
+ "],"
+ "\"ingest\":{"
+ "\"number_of_pipelines\":0,"
+ "\"processor_stats\":{}"
+ "}"
+ "}"
+ "},"
+ "\"cluster_state\":{"