ingest: processor stats (#34724)

This change introduces stats per processors. Total, time, failed,
current are currently supported. All pipelines will now show all
top level processors that belong to it. Failure processors are not
displayed, however, the time taken to execute the failure chain is part
of the stats for the top level processor.

The processor name is the type of the processor, ordered as defined in
the pipeline. If a tag for the processor is found, then the tag is
appended to the type.

Pipeline processors will have the pipeline name appended to the name of
the name of the processors (before the tag if one exists). If more
then one pipeline is used to process the document, then each pipeline
will carry its own stats. The outer most pipeline will also include the
inner most pipeline stats.

Conditional processors will only included in the stats if the condition evaluates
to true.
This commit is contained in:
Jake Landis 2018-10-23 07:30:52 -05:00 committed by GitHub
parent 4a8386f271
commit ad94e79350
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 696 additions and 150 deletions

View File

@ -170,8 +170,8 @@ task verifyVersions {
* the enabled state of every bwc task. It should be set back to true
* after the backport of the backcompat code is complete.
*/
final boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
final boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/34724" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

View File

@ -20,12 +20,15 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
/**
@ -40,16 +43,33 @@ public class CompoundProcessor implements Processor {
private final boolean ignoreFailure;
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
}
public CompoundProcessor(Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList());
}
public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
}
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
LongSupplier relativeTimeProvider) {
super();
this.ignoreFailure = ignoreFailure;
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
}
List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
return processorsWithMetrics;
}
public boolean isIgnoreFailure() {
@ -94,12 +114,17 @@ public class CompoundProcessor implements Processor {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Processor processor : processors) {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) {
return null;
}
} catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) {
continue;
}
@ -112,11 +137,15 @@ public class CompoundProcessor implements Processor {
executeOnFailure(ingestDocument, compoundProcessorException);
break;
}
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
}
return ingestDocument;
}
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception);

View File

@ -28,6 +28,8 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script;
@ -42,12 +44,20 @@ public class ConditionalProcessor extends AbstractProcessor {
private final ScriptService scriptService;
private final Processor processor;
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
this(tag, script, scriptService, processor, System::nanoTime);
}
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
super(tag);
this.condition = script;
this.scriptService = scriptService;
this.processor = processor;
this.metric = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider;
}
@Override
@ -55,11 +65,30 @@ public class ConditionalProcessor extends AbstractProcessor {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
return processor.execute(ingestDocument);
// Only record metric if the script evaluates to true
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
return processor.execute(ingestDocument);
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
}
return ingestDocument;
}
Processor getProcessor() {
return processor;
}
IngestMetric getMetric() {
return metric;
}
@Override
public String getType() {
return TYPE;

View File

@ -19,19 +19,6 @@
package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@ -49,6 +36,7 @@ import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -61,6 +49,19 @@ import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* Holder class for several ingest related services.
*/
@ -262,11 +263,59 @@ public class IngestService implements ClusterStateApplier {
Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
}
});
}
}
/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
*/
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorMetrics) {
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
if (processor instanceof CompoundProcessor) {
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
} else {
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor) {
metric = ((ConditionalProcessor) processor).getMetric();
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
return processorMetrics;
}
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
@ -371,11 +420,42 @@ public class IngestService implements ClusterStateApplier {
}
public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> {
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
});
});
return statsBuilder.build();
}
Map<String, IngestStats.Stats> statsPerPipeline =
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));
//package private for testing
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
sb.append(":");
sb.append(pipelineName);
}
String tag = processor.getTag();
if(tag != null && !tag.isEmpty()){
sb.append(":");
sb.append(tag);
}
return sb.toString();
}
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -27,17 +28,28 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class IngestStats implements Writeable, ToXContentFragment {
private final Stats totalStats;
private final Map<String, Stats> statsPerPipeline;
private final List<PipelineStat> pipelineStats;
private final Map<String, List<ProcessorStat>> processorStats;
public IngestStats(Stats totalStats, Map<String, Stats> statsPerPipeline) {
/**
* @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats,
* and pipeline stats are logically the sum of the processor stats.
* @param pipelineStats - The stats for a given ingest pipeline.
* @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier.
*/
public IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats) {
this.totalStats = totalStats;
this.statsPerPipeline = statsPerPipeline;
this.pipelineStats = pipelineStats;
this.processorStats = processorStats;
}
/**
@ -46,37 +58,47 @@ public class IngestStats implements Writeable, ToXContentFragment {
public IngestStats(StreamInput in) throws IOException {
this.totalStats = new Stats(in);
int size = in.readVInt();
this.statsPerPipeline = new HashMap<>(size);
this.pipelineStats = new ArrayList<>(size);
this.processorStats = new HashMap<>(size);
for (int i = 0; i < size; i++) {
statsPerPipeline.put(in.readString(), new Stats(in));
String pipelineId = in.readString();
Stats pipelineStat = new Stats(in);
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat));
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
int processorsSize = in.readVInt();
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
totalStats.writeTo(out);
out.writeVInt(statsPerPipeline.size());
for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
out.writeVInt(pipelineStats.size());
for (PipelineStat pipelineStat : pipelineStats) {
out.writeString(pipelineStat.getPipelineId());
pipelineStat.getStats().writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
if (processorStatsForPipeline == null) {
out.writeVInt(0);
} else {
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
processorStat.getStats().writeTo(out);
}
}
}
}
}
/**
* @return The accumulated stats for all pipelines
*/
public Stats getTotalStats() {
return totalStats;
}
/**
* @return The stats on a per pipeline basis
*/
public Map<String, Stats> getStatsPerPipeline() {
return statsPerPipeline;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("ingest");
@ -84,9 +106,21 @@ public class IngestStats implements Writeable, ToXContentFragment {
totalStats.toXContent(builder, params);
builder.endObject();
builder.startObject("pipelines");
for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) {
builder.startObject(entry.getKey());
entry.getValue().toXContent(builder, params);
for (PipelineStat pipelineStat : pipelineStats) {
builder.startObject(pipelineStat.getPipelineId());
pipelineStat.getStats().toXContent(builder, params);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
builder.startArray("processors");
if (processorStatsForPipeline != null) {
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
}
}
builder.endArray();
builder.endObject();
}
builder.endObject();
@ -94,6 +128,18 @@ public class IngestStats implements Writeable, ToXContentFragment {
return builder;
}
public Stats getTotalStats() {
return totalStats;
}
public List<PipelineStat> getPipelineStats() {
return pipelineStats;
}
public Map<String, List<ProcessorStat>> getProcessorStats() {
return processorStats;
}
public static class Stats implements Writeable, ToXContentFragment {
private final long ingestCount;
@ -134,7 +180,6 @@ public class IngestStats implements Writeable, ToXContentFragment {
}
/**
*
* @return The total time spent of ingest preprocessing in millis.
*/
public long getIngestTimeInMillis() {
@ -164,4 +209,77 @@ public class IngestStats implements Writeable, ToXContentFragment {
return builder;
}
}
/**
* Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects
*/
static class Builder {
private Stats totalStats;
private List<PipelineStat> pipelineStats = new ArrayList<>();
private Map<String, List<ProcessorStat>> processorStats = new HashMap<>();
Builder addTotalMetrics(IngestMetric totalMetric) {
this.totalStats = totalMetric.createStats();
return this;
}
Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats()));
return this;
}
Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
return this;
}
IngestStats build() {
return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats),
Collections.unmodifiableMap(processorStats));
}
}
/**
* Container for pipeline stats.
*/
public static class PipelineStat {
private final String pipelineId;
private final Stats stats;
public PipelineStat(String pipelineId, Stats stats) {
this.pipelineId = pipelineId;
this.stats = stats;
}
public String getPipelineId() {
return pipelineId;
}
public Stats getStats() {
return stats;
}
}
/**
* Container for processor stats.
*/
public static class ProcessorStat {
private final String name;
private final Stats stats;
public ProcessorStat(String name, Stats stats) {
this.name = name;
this.stats = stats;
}
public String getName() {
return name;
}
public Stats getStats() {
return stats;
}
}
}

View File

@ -22,11 +22,12 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import org.elasticsearch.script.ScriptService;
@ -47,20 +48,21 @@ public final class Pipeline {
private final Integer version;
private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics;
private final Clock clock;
private final LongSupplier relativeTimeProvider;
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this(id, description, version, compoundProcessor, Clock.systemUTC());
this(id, description, version, compoundProcessor, System::nanoTime);
}
//package private for testing
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) {
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor,
LongSupplier relativeTimeProvider) {
this.id = id;
this.description = description;
this.compoundProcessor = compoundProcessor;
this.version = version;
this.metrics = new IngestMetric();
this.clock = clock;
this.relativeTimeProvider = relativeTimeProvider;
}
public static Pipeline create(String id, Map<String, Object> config,
@ -89,7 +91,7 @@ public final class Pipeline {
* Modifies the data of a document to be indexed based on the processor this pipeline holds
*/
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
long startTimeInMillis = clock.millis();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metrics.preIngest();
return compoundProcessor.execute(ingestDocument);
@ -97,7 +99,7 @@ public final class Pipeline {
metrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = clock.millis() - startTimeInMillis;
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis);
}
}

View File

@ -53,6 +53,10 @@ public class PipelineProcessor extends AbstractProcessor {
return TYPE;
}
String getPipelineName() {
return pipelineName;
}
public static final class Factory implements Processor.Factory {
private final IngestService ingestService;

View File

@ -53,7 +53,6 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class NodeStatsTests extends ESTestCase {
public void testSerialization() throws IOException {
NodeStats nodeStats = createNodeStats();
try (BytesStreamOutput out = new BytesStreamOutput()) {
@ -271,14 +270,29 @@ public class NodeStatsTests extends ESTestCase {
assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent());
assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount());
assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis());
assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size());
for (Map.Entry<String, IngestStats.Stats> entry : ingestStats.getStatsPerPipeline().entrySet()) {
IngestStats.Stats stats = entry.getValue();
IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey());
assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount());
assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis());
assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent());
assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount());
assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size());
for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) {
String pipelineId = pipelineStat.getPipelineId();
IngestStats.Stats deserializedPipelineStats =
getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId);
assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount());
assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis());
assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent());
assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount());
List<IngestStats.ProcessorStat> processorStats = ingestStats.getProcessorStats().get(pipelineId);
//intentionally validating identical order
Iterator<IngestStats.ProcessorStat> it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator();
for (IngestStats.ProcessorStat processorStat : processorStats) {
IngestStats.ProcessorStat deserializedProcessorStat = it.next();
assertEquals(processorStat.getStats().getIngestFailedCount(),
deserializedProcessorStat.getStats().getIngestFailedCount());
assertEquals(processorStat.getStats().getIngestTimeInMillis(),
deserializedProcessorStat.getStats().getIngestTimeInMillis());
assertEquals(processorStat.getStats().getIngestCurrent(),
deserializedProcessorStat.getStats().getIngestCurrent());
assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount());
}
assertFalse(it.hasNext());
}
}
AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats();
@ -429,14 +443,24 @@ public class NodeStatsTests extends ESTestCase {
if (frequently()) {
IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong());
int numPipelines = randomIntBetween(0, 10);
int numProcessors = randomIntBetween(0, 10);
List<IngestStats.PipelineStat> ingestPipelineStats = new ArrayList<>(numPipelines);
Map<String, List<IngestStats.ProcessorStat>> ingestProcessorStats = new HashMap<>(numPipelines);
for (int i = 0; i < numPipelines; i++) {
String pipelineId = randomAlphaOfLengthBetween(3, 10);
ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())));
int numStatsPerPipeline = randomIntBetween(0, 10);
Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>();
for (int i = 0; i < numStatsPerPipeline; i++) {
statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
List<IngestStats.ProcessorStat> processorPerPipeline = new ArrayList<>(numProcessors);
for (int j =0; j < numProcessors;j++) {
IngestStats.Stats processorStats = new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
}
ingestStats = new IngestStats(totalStats, statsPerPipeline);
ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats);
}
AdaptiveSelectionStats adaptiveSelectionStats = null;
if (frequently()) {
@ -465,4 +489,8 @@ public class NodeStatsTests extends ESTestCase {
fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
ingestStats, adaptiveSelectionStats);
}
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
}

View File

@ -27,11 +27,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
@ -49,18 +55,29 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessor() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
TestProcessor processor = new TestProcessor(ingestDocument ->{
assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0);
});
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument);
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 0, 1);
}
public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
@ -71,15 +88,22 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(e.getRootCause().getMessage(), equalTo("error"));
}
assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList());
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor =
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertStats(1, compoundProcessor, 0, 1, 0, 0);
assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value"));
}
@ -93,11 +117,15 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2));
Collections.singletonList(processor2), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 1);
assertThat(processor2.getInvokedCounter(), equalTo(1));
}
@ -118,14 +146,17 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2"));
});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail),
Collections.singletonList(lastProcessor));
Collections.singletonList(lastProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor));
Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
@ -137,21 +168,24 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1"));
});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -160,21 +194,24 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor));
Collections.singletonList(failProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -183,27 +220,44 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(new CompoundProcessor(failProcessor)));
Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor)));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
public void testBreakOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
Collections.singletonList(onFailureProcessor));
Collections.singletonList(onFailureProcessor), relativeTimeProvider);
pipeline.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
assertStats(pipeline, 1, 1, 0);
}
private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
assertStats(0, compoundProcessor, 0L, count, failed, time);
}
private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) {
IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats();
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(current));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), equalTo(time));
}
}

View File

@ -33,12 +33,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConditionalProcessorTests extends ESTestCase {
@ -60,6 +66,8 @@ public class ConditionalProcessorTests extends ESTestCase {
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
Map<String, Object> document = new HashMap<>();
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2));
ConditionalProcessor processor = new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(
@ -67,7 +75,10 @@ public class ConditionalProcessorTests extends ESTestCase {
scriptName, Collections.emptyMap()), scriptService,
new Processor() {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(final IngestDocument ingestDocument){
if(ingestDocument.hasField("error")){
throw new RuntimeException("error");
}
ingestDocument.setFieldValue("foo", "bar");
return ingestDocument;
}
@ -81,20 +92,37 @@ public class ConditionalProcessorTests extends ESTestCase {
public String getTag() {
return null;
}
});
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
}, relativeTimeProvider);
//false, never call processor never increments metrics
String falseValue = "falsy";
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 0, 0, 0);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
ingestDocument.setFieldValue("error", true);
processor.execute(ingestDocument);
assertStats(processor, 0, 0, 0);
//true, always call processor and increments metrics
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
assertStats(processor, 1, 0, 1);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
ingestDocument.setFieldValue("error", true);
IngestDocument finalIngestDocument = ingestDocument;
expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument));
assertStats(processor, 2, 1, 2);
}
@SuppressWarnings("unchecked")
@ -141,5 +169,14 @@ public class ConditionalProcessorTests extends ESTestCase {
Exception e = expectedException.get();
assertThat(e, instanceOf(UnsupportedOperationException.class));
assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage());
assertStats(processor, 0, 0, 0);
}
private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) {
IngestStats.Stats stats = conditionalProcessor.getMetric().createStats();
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(0L));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time));
}
}

View File

@ -63,6 +63,7 @@ import java.util.function.Consumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -746,16 +747,23 @@ public class IngestServiceTests extends ESTestCase {
verify(completionHandler, times(1)).accept(null);
}
public void testStats() {
public void testStats() throws Exception {
final Processor processor = mock(Processor.class);
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"mock", (factories, tag, config) -> processor));
final Processor processorFailure = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
when(processorFailure.getType()).thenReturn("failure-mock");
//avoid returning null and dropping the document
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error"));
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor);
map.put("failure-mock", (factories, tag, config) -> processorFailure);
IngestService ingestService = createWithProcessors(map);
final IngestStats initialStats = ingestService.stats();
assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0));
assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
assertThat(initialStats.getPipelineStats().size(), equalTo(0));
assertStats(initialStats.getTotalStats(), 0, 0, 0);
PutPipelineRequest putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
@ -769,7 +777,6 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
@ -778,18 +785,33 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L));
assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L));
assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2));
afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag"));
afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag"));
//total
assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0);
//pipeline
assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0);
//processor
assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0);
indexRequest.setPipeline("_id2");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterSecondRequestStats = ingestService.stats();
assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2));
//total
assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0);
//pipeline
assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//processor
assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0);
//update cluster state and ensure that new stats are added to old stats
putRequest = new PutPipelineRequest("_id1",
@ -800,13 +822,66 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterThirdRequestStats = ingestService.stats();
assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L));
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L));
assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2));
//total
assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0);
//pipeline
assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0);
assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is
//due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each
//processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases,
//like this one it may not readily obvious why the metrics were not carried forward.
assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0);
//test a failure, and that the processor stats are added from the old stats
putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"),
XContentType.JSON);
previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterForthRequestStats = ingestService.stats();
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
//total
assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0);
//pipeline
assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0);
assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//processor
assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed
assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats
assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0);
}
public void testStatName(){
Processor processor = mock(Processor.class);
String name = randomAlphaOfLength(10);
when(processor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(processor), equalTo(name));
String tag = randomAlphaOfLength(10);
when(processor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
when(conditionalProcessor.getProcessor()).thenReturn(processor);
assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));
when(pipelineProcessor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag));
}
public void testExecuteWithDrop() {
Map<String, Processor.Factory> factories = new HashMap<>();
factories.put("drop", new DropProcessor.Factory());
@ -935,4 +1010,23 @@ public class IngestServiceTests extends ESTestCase {
return false;
}
}
private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) {
assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time);
}
private void assertPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String pipelineId, long count, long failed, long time) {
assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time);
}
private void assertStats(IngestStats.Stats stats, long count, long failed, long time) {
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(0L));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time));
}
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
}

View File

@ -19,44 +19,70 @@
package org.elasticsearch.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class IngestStatsTests extends ESTestCase {
public void testSerialization() throws IOException {
IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30);
IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300);
IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo));
IngestStats serialize = serialize(ingestStats);
assertNotSame(serialize, ingestStats);
assertNotSame(serialize.getTotalStats(), total);
assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount());
assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount());
assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis());
assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent());
IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300);
List<IngestStats.PipelineStat> pipelineStats = createPipelineStats();
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats);
IngestStats serializedStats = serialize(ingestStats);
assertIngestStats(ingestStats, serializedStats, true);
}
assertEquals(ingestStats.getStatsPerPipeline().size(), 1);
assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo"));
public void testReadLegacyStream() throws IOException {
IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300);
List<IngestStats.PipelineStat> pipelineStats = createPipelineStats();
Map<String, IngestStats.Stats> left = ingestStats.getStatsPerPipeline();
Map<String, IngestStats.Stats> right = serialize.getStatsPerPipeline();
//legacy output logic
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0));
totalStats.writeTo(out);
out.writeVInt(pipelineStats.size());
for (IngestStats.PipelineStat pipelineStat : pipelineStats) {
out.writeString(pipelineStat.getPipelineId());
pipelineStat.getStats().writeTo(out);
}
assertEquals(right.size(), 1);
assertTrue(right.containsKey("foo"));
assertEquals(left.size(), 1);
assertTrue(left.containsKey("foo"));
IngestStats.Stats leftStats = left.get("foo");
IngestStats.Stats rightStats = right.get("foo");
assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount());
assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount());
assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis());
assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent());
StreamInput in = out.bytes().streamInput();
in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0));
IngestStats serializedStats = new IngestStats(in);
IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap());
assertIngestStats(expectedStats, serializedStats, false);
}
private List<IngestStats.PipelineStat> createPipelineStats() {
IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3));
IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297));
IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0));
return Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList());
}
private Map<String, List<IngestStats.ProcessorStat>> createProcessorStats(List<IngestStats.PipelineStat> pipelineStats){
assert(pipelineStats.size() >= 2);
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
return MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
.put(pipelineStats.get(1).getPipelineId(), Collections.singletonList(processor3Stat))
.map();
}
private IngestStats serialize(IngestStats stats) throws IOException {
@ -65,4 +91,48 @@ public class IngestStatsTests extends ESTestCase {
StreamInput in = out.bytes().streamInput();
return new IngestStats(in);
}
private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){
assertNotSame(ingestStats, serializedStats);
assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats());
assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats());
assertNotSame(ingestStats.getProcessorStats(), serializedStats.getProcessorStats());
assertStats(ingestStats.getTotalStats(), serializedStats.getTotalStats());
assertEquals(ingestStats.getPipelineStats().size(), serializedStats.getPipelineStats().size());
for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) {
assertStats(getPipelineStats(ingestStats.getPipelineStats(), serializedPipelineStat.getPipelineId()),
serializedPipelineStat.getStats());
List<IngestStats.ProcessorStat> serializedProcessorStats =
serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId());
List<IngestStats.ProcessorStat> processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId());
if(expectProcessors) {
if (processorStat != null) {
Iterator<IngestStats.ProcessorStat> it = processorStat.iterator();
//intentionally enforcing the identical ordering
for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) {
IngestStats.ProcessorStat ps = it.next();
assertEquals(ps.getName(), serializedProcessorStat.getName());
assertStats(ps.getStats(), serializedProcessorStat.getStats());
}
assertFalse(it.hasNext());
}
}else{
//pre 6.5 did not serialize any processor stats
assertNull(serializedProcessorStats);
}
}
}
private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) {
assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount());
assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount());
assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis());
assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent());
}
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
}

View File

@ -21,12 +21,13 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.ESTestCase;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock;
@ -143,15 +144,15 @@ public class PipelineProcessorTests extends ESTestCase {
pipeline2ProcessorConfig.put("name", pipeline3Id);
PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig);
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
Pipeline pipeline1 = new Pipeline(
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider
);
String key1 = randomAlphaOfLength(10);
clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L).thenReturn(3L);
relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3));
Pipeline pipeline2 = new Pipeline(
pipeline2Id, null, null, new CompoundProcessor(true,
Arrays.asList(
@ -160,15 +161,15 @@ public class PipelineProcessorTests extends ESTestCase {
}),
pipeline2Processor),
Collections.emptyList()),
clock
relativeTimeProvider
);
clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L).thenReturn(2L);
relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2));
Pipeline pipeline3 = new Pipeline(
pipeline3Id, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> {
throw new RuntimeException("error");
})), clock
})), relativeTimeProvider
);
when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);