ingest: processor stats (#34202)

This change introduces stats per processors. Total, time, failed,
current are currently supported. All pipelines will now show all
top level processors that belong to it. Failure processors are not
displayed, however, the time taken to execute the failure chain is part
of the stats for the top level processor.

The processor name is the type of the processor, ordered as defined in
the pipeline. If a tag for the processor is found, then the tag is
appended to the type.

Pipeline processors will have the pipeline name appended to the name of
the name of the processors (before the tag if one exists). If more
then one pipeline is used to process the document, then each pipeline
will carry its own stats. The outer most pipeline will also include the
inner most pipeline stats.

Conditional processors will only included in the stats if the condition evaluates
to true.
This commit is contained in:
Jake Landis 2018-10-20 16:01:01 -05:00 committed by GitHub
parent dca452f043
commit 6567729600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 654 additions and 148 deletions

View File

@ -20,12 +20,15 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -40,16 +43,33 @@ public class CompoundProcessor implements Processor {
private final boolean ignoreFailure; private final boolean ignoreFailure;
private final List<Processor> processors; private final List<Processor> processors;
private final List<Processor> onFailureProcessors; private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
}
public CompoundProcessor(Processor... processor) { public CompoundProcessor(Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList()); this(false, Arrays.asList(processor), Collections.emptyList());
} }
public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) { public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
}
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
LongSupplier relativeTimeProvider) {
super(); super();
this.ignoreFailure = ignoreFailure; this.ignoreFailure = ignoreFailure;
this.processors = processors; this.processors = processors;
this.onFailureProcessors = onFailureProcessors; this.onFailureProcessors = onFailureProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
}
List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
return processorsWithMetrics;
} }
public boolean isIgnoreFailure() { public boolean isIgnoreFailure() {
@ -94,12 +114,17 @@ public class CompoundProcessor implements Processor {
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Processor processor : processors) { for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try { try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) { if (processor.execute(ingestDocument) == null) {
return null; return null;
} }
} catch (Exception e) { } catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) { if (ignoreFailure) {
continue; continue;
} }
@ -112,11 +137,15 @@ public class CompoundProcessor implements Processor {
executeOnFailure(ingestDocument, compoundProcessorException); executeOnFailure(ingestDocument, compoundProcessorException);
break; break;
} }
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
} }
} }
return ingestDocument; return ingestDocument;
} }
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try { try {
putFailureMetadata(ingestDocument, exception); putFailureMetadata(ingestDocument, exception);

View File

@ -28,6 +28,8 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
@ -42,12 +44,20 @@ public class ConditionalProcessor extends AbstractProcessor {
private final ScriptService scriptService; private final ScriptService scriptService;
private final Processor processor; private final Processor processor;
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
this(tag, script, scriptService, processor, System::nanoTime);
}
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
super(tag); super(tag);
this.condition = script; this.condition = script;
this.scriptService = scriptService; this.scriptService = scriptService;
this.processor = processor; this.processor = processor;
this.metric = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider;
} }
@Override @Override
@ -55,11 +65,30 @@ public class ConditionalProcessor extends AbstractProcessor {
IngestConditionalScript script = IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
return processor.execute(ingestDocument); // Only record metric if the script evaluates to true
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
return processor.execute(ingestDocument);
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
} }
return ingestDocument; return ingestDocument;
} }
Processor getProcessor() {
return processor;
}
IngestMetric getMetric() {
return metric;
}
@Override @Override
public String getType() { public String getType() {
return TYPE; return TYPE;

View File

@ -19,19 +19,6 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
@ -49,6 +36,7 @@ import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -61,6 +49,19 @@ import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/** /**
* Holder class for several ingest related services. * Holder class for several ingest related services.
*/ */
@ -262,11 +263,59 @@ public class IngestService implements ClusterStateApplier {
Pipeline originalPipeline = originalPipelines.get(id); Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) { if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics()); pipeline.getMetrics().add(originalPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
} }
}); });
} }
} }
/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
*/
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorMetrics) {
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
if (processor instanceof CompoundProcessor) {
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
} else {
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor) {
metric = ((ConditionalProcessor) processor).getMetric();
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
return processorMetrics;
}
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
@ -371,11 +420,42 @@ public class IngestService implements ClusterStateApplier {
} }
public IngestStats stats() { public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> {
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
});
});
return statsBuilder.build();
}
Map<String, IngestStats.Stats> statsPerPipeline = //package private for testing
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
return new IngestStats(totalMetrics.createStats(), statsPerPipeline); if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
sb.append(":");
sb.append(pipelineName);
}
String tag = processor.getTag();
if(tag != null && !tag.isEmpty()){
sb.append(":");
sb.append(tag);
}
return sb.toString();
} }
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception { private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {

View File

@ -27,17 +27,28 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class IngestStats implements Writeable, ToXContentFragment { public class IngestStats implements Writeable, ToXContentFragment {
private final Stats totalStats; private final Stats totalStats;
private final Map<String, Stats> statsPerPipeline; private final List<PipelineStat> pipelineStats;
private final Map<String, List<ProcessorStat>> processorStats;
public IngestStats(Stats totalStats, Map<String, Stats> statsPerPipeline) { /**
* @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats,
* and pipeline stats are logically the sum of the processor stats.
* @param pipelineStats - The stats for a given ingest pipeline.
* @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier.
*/
public IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats) {
this.totalStats = totalStats; this.totalStats = totalStats;
this.statsPerPipeline = statsPerPipeline; this.pipelineStats = pipelineStats;
this.processorStats = processorStats;
} }
/** /**
@ -46,37 +57,43 @@ public class IngestStats implements Writeable, ToXContentFragment {
public IngestStats(StreamInput in) throws IOException { public IngestStats(StreamInput in) throws IOException {
this.totalStats = new Stats(in); this.totalStats = new Stats(in);
int size = in.readVInt(); int size = in.readVInt();
this.statsPerPipeline = new HashMap<>(size); this.pipelineStats = new ArrayList<>(size);
this.processorStats = new HashMap<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
statsPerPipeline.put(in.readString(), new Stats(in)); String pipelineId = in.readString();
Stats pipelineStat = new Stats(in);
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat));
int processorsSize = in.readVInt();
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
} }
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
totalStats.writeTo(out); totalStats.writeTo(out);
out.writeVInt(statsPerPipeline.size()); out.writeVInt(pipelineStats.size());
for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) { for (PipelineStat pipelineStat : pipelineStats) {
out.writeString(entry.getKey()); out.writeString(pipelineStat.getPipelineId());
entry.getValue().writeTo(out); pipelineStat.getStats().writeTo(out);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
if(processorStatsForPipeline == null) {
out.writeVInt(0);
}else{
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
processorStat.getStats().writeTo(out);
}
}
} }
} }
/**
* @return The accumulated stats for all pipelines
*/
public Stats getTotalStats() {
return totalStats;
}
/**
* @return The stats on a per pipeline basis
*/
public Map<String, Stats> getStatsPerPipeline() {
return statsPerPipeline;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("ingest"); builder.startObject("ingest");
@ -84,9 +101,21 @@ public class IngestStats implements Writeable, ToXContentFragment {
totalStats.toXContent(builder, params); totalStats.toXContent(builder, params);
builder.endObject(); builder.endObject();
builder.startObject("pipelines"); builder.startObject("pipelines");
for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) { for (PipelineStat pipelineStat : pipelineStats) {
builder.startObject(entry.getKey()); builder.startObject(pipelineStat.getPipelineId());
entry.getValue().toXContent(builder, params); pipelineStat.getStats().toXContent(builder, params);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
builder.startArray("processors");
if (processorStatsForPipeline != null) {
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
}
}
builder.endArray();
builder.endObject(); builder.endObject();
} }
builder.endObject(); builder.endObject();
@ -94,6 +123,18 @@ public class IngestStats implements Writeable, ToXContentFragment {
return builder; return builder;
} }
public Stats getTotalStats() {
return totalStats;
}
public List<PipelineStat> getPipelineStats() {
return pipelineStats;
}
public Map<String, List<ProcessorStat>> getProcessorStats() {
return processorStats;
}
public static class Stats implements Writeable, ToXContentFragment { public static class Stats implements Writeable, ToXContentFragment {
private final long ingestCount; private final long ingestCount;
@ -134,7 +175,6 @@ public class IngestStats implements Writeable, ToXContentFragment {
} }
/** /**
*
* @return The total time spent of ingest preprocessing in millis. * @return The total time spent of ingest preprocessing in millis.
*/ */
public long getIngestTimeInMillis() { public long getIngestTimeInMillis() {
@ -164,4 +204,77 @@ public class IngestStats implements Writeable, ToXContentFragment {
return builder; return builder;
} }
} }
/**
* Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects
*/
static class Builder {
private Stats totalStats;
private List<PipelineStat> pipelineStats = new ArrayList<>();
private Map<String, List<ProcessorStat>> processorStats = new HashMap<>();
Builder addTotalMetrics(IngestMetric totalMetric) {
this.totalStats = totalMetric.createStats();
return this;
}
Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats()));
return this;
}
Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
return this;
}
IngestStats build() {
return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats),
Collections.unmodifiableMap(processorStats));
}
}
/**
* Container for pipeline stats.
*/
public static class PipelineStat {
private final String pipelineId;
private final Stats stats;
public PipelineStat(String pipelineId, Stats stats) {
this.pipelineId = pipelineId;
this.stats = stats;
}
public String getPipelineId() {
return pipelineId;
}
public Stats getStats() {
return stats;
}
}
/**
* Container for processor stats.
*/
public static class ProcessorStat {
private final String name;
private final Stats stats;
public ProcessorStat(String name, Stats stats) {
this.name = name;
this.stats = stats;
}
public String getName() {
return name;
}
public Stats getStats() {
return stats;
}
}
} }

View File

@ -22,11 +22,12 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import java.time.Clock;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
@ -47,20 +48,21 @@ public final class Pipeline {
private final Integer version; private final Integer version;
private final CompoundProcessor compoundProcessor; private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics; private final IngestMetric metrics;
private final Clock clock; private final LongSupplier relativeTimeProvider;
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this(id, description, version, compoundProcessor, Clock.systemUTC()); this(id, description, version, compoundProcessor, System::nanoTime);
} }
//package private for testing //package private for testing
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor,
LongSupplier relativeTimeProvider) {
this.id = id; this.id = id;
this.description = description; this.description = description;
this.compoundProcessor = compoundProcessor; this.compoundProcessor = compoundProcessor;
this.version = version; this.version = version;
this.metrics = new IngestMetric(); this.metrics = new IngestMetric();
this.clock = clock; this.relativeTimeProvider = relativeTimeProvider;
} }
public static Pipeline create(String id, Map<String, Object> config, public static Pipeline create(String id, Map<String, Object> config,
@ -89,7 +91,7 @@ public final class Pipeline {
* Modifies the data of a document to be indexed based on the processor this pipeline holds * Modifies the data of a document to be indexed based on the processor this pipeline holds
*/ */
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
long startTimeInMillis = clock.millis(); long startTimeInNanos = relativeTimeProvider.getAsLong();
try { try {
metrics.preIngest(); metrics.preIngest();
return compoundProcessor.execute(ingestDocument); return compoundProcessor.execute(ingestDocument);
@ -97,7 +99,7 @@ public final class Pipeline {
metrics.ingestFailed(); metrics.ingestFailed();
throw e; throw e;
} finally { } finally {
long ingestTimeInMillis = clock.millis() - startTimeInMillis; long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis); metrics.postIngest(ingestTimeInMillis);
} }
} }

View File

@ -53,6 +53,10 @@ public class PipelineProcessor extends AbstractProcessor {
return TYPE; return TYPE;
} }
String getPipelineName() {
return pipelineName;
}
public static final class Factory implements Processor.Factory { public static final class Factory implements Processor.Factory {
private final IngestService ingestService; private final IngestService ingestService;

View File

@ -53,7 +53,6 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
public class NodeStatsTests extends ESTestCase { public class NodeStatsTests extends ESTestCase {
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
NodeStats nodeStats = createNodeStats(); NodeStats nodeStats = createNodeStats();
try (BytesStreamOutput out = new BytesStreamOutput()) { try (BytesStreamOutput out = new BytesStreamOutput()) {
@ -271,14 +270,29 @@ public class NodeStatsTests extends ESTestCase {
assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent());
assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount());
assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis());
assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size()); assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size());
for (Map.Entry<String, IngestStats.Stats> entry : ingestStats.getStatsPerPipeline().entrySet()) { for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) {
IngestStats.Stats stats = entry.getValue(); String pipelineId = pipelineStat.getPipelineId();
IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey()); IngestStats.Stats deserializedPipelineStats =
assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount()); getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId);
assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis()); assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount());
assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent()); assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis());
assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount()); assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent());
assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount());
List<IngestStats.ProcessorStat> processorStats = ingestStats.getProcessorStats().get(pipelineId);
//intentionally validating identical order
Iterator<IngestStats.ProcessorStat> it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator();
for (IngestStats.ProcessorStat processorStat : processorStats) {
IngestStats.ProcessorStat deserializedProcessorStat = it.next();
assertEquals(processorStat.getStats().getIngestFailedCount(),
deserializedProcessorStat.getStats().getIngestFailedCount());
assertEquals(processorStat.getStats().getIngestTimeInMillis(),
deserializedProcessorStat.getStats().getIngestTimeInMillis());
assertEquals(processorStat.getStats().getIngestCurrent(),
deserializedProcessorStat.getStats().getIngestCurrent());
assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount());
}
assertFalse(it.hasNext());
} }
} }
AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats();
@ -429,14 +443,24 @@ public class NodeStatsTests extends ESTestCase {
if (frequently()) { if (frequently()) {
IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong()); randomNonNegativeLong());
int numPipelines = randomIntBetween(0, 10);
int numProcessors = randomIntBetween(0, 10);
List<IngestStats.PipelineStat> ingestPipelineStats = new ArrayList<>(numPipelines);
Map<String, List<IngestStats.ProcessorStat>> ingestProcessorStats = new HashMap<>(numPipelines);
for (int i = 0; i < numPipelines; i++) {
String pipelineId = randomAlphaOfLengthBetween(3, 10);
ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())));
int numStatsPerPipeline = randomIntBetween(0, 10); List<IngestStats.ProcessorStat> processorPerPipeline = new ArrayList<>(numProcessors);
Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(); for (int j =0; j < numProcessors;j++) {
for (int i = 0; i < numStatsPerPipeline; i++) { IngestStats.Stats processorStats = new IngestStats.Stats
statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(), (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
} }
ingestStats = new IngestStats(totalStats, statsPerPipeline); ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats);
} }
AdaptiveSelectionStats adaptiveSelectionStats = null; AdaptiveSelectionStats adaptiveSelectionStats = null;
if (frequently()) { if (frequently()) {
@ -465,4 +489,8 @@ public class NodeStatsTests extends ESTestCase {
fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
ingestStats, adaptiveSelectionStats); ingestStats, adaptiveSelectionStats);
} }
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
} }

View File

@ -27,11 +27,17 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CompoundProcessorTests extends ESTestCase { public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument; private IngestDocument ingestDocument;
@ -49,18 +55,29 @@ public class CompoundProcessorTests extends ESTestCase {
} }
public void testSingleProcessor() throws Exception { public void testSingleProcessor() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {}); LongSupplier relativeTimeProvider = mock(LongSupplier.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
TestProcessor processor = new TestProcessor(ingestDocument ->{
assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0);
});
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 0, 1);
} }
public void testSingleProcessorWithException() throws Exception { public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor); LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
@ -71,15 +88,22 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(e.getRootCause().getMessage(), equalTo("error")); assertThat(e.getRootCause().getMessage(), equalTo("error"));
} }
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testIgnoreFailure() throws Exception { public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList()); LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor =
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
assertStats(1, compoundProcessor, 0, 1, 0, 0);
assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value"));
} }
@ -93,11 +117,15 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)); Collections.singletonList(processor2), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 1);
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
} }
@ -118,14 +146,17 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail),
Collections.singletonList(lastProcessor)); Collections.singletonList(lastProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor)); Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
@ -137,21 +168,24 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor)); Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testCompoundProcessorExceptionFail() throws Exception { public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3)); assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -160,21 +194,24 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor)); Collections.singletonList(failProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor)); Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3)); assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -183,27 +220,44 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(new CompoundProcessor(failProcessor))); Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor)));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor)); Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testBreakOnFailure() throws Exception { public void testBreakOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");}); TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {}); TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
Collections.singletonList(onFailureProcessor)); Collections.singletonList(onFailureProcessor), relativeTimeProvider);
pipeline.execute(ingestDocument); pipeline.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
assertStats(pipeline, 1, 1, 0);
}
private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
assertStats(0, compoundProcessor, 0L, count, failed, time);
}
private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) {
IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats();
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(current));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), equalTo(time));
} }
} }

View File

@ -33,12 +33,18 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConditionalProcessorTests extends ESTestCase { public class ConditionalProcessorTests extends ESTestCase {
@ -60,6 +66,8 @@ public class ConditionalProcessorTests extends ESTestCase {
new HashMap<>(ScriptModule.CORE_CONTEXTS) new HashMap<>(ScriptModule.CORE_CONTEXTS)
); );
Map<String, Object> document = new HashMap<>(); Map<String, Object> document = new HashMap<>();
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2));
ConditionalProcessor processor = new ConditionalProcessor( ConditionalProcessor processor = new ConditionalProcessor(
randomAlphaOfLength(10), randomAlphaOfLength(10),
new Script( new Script(
@ -67,7 +75,10 @@ public class ConditionalProcessorTests extends ESTestCase {
scriptName, Collections.emptyMap()), scriptService, scriptName, Collections.emptyMap()), scriptService,
new Processor() { new Processor() {
@Override @Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { public IngestDocument execute(final IngestDocument ingestDocument){
if(ingestDocument.hasField("error")){
throw new RuntimeException("error");
}
ingestDocument.setFieldValue("foo", "bar"); ingestDocument.setFieldValue("foo", "bar");
return ingestDocument; return ingestDocument;
} }
@ -81,20 +92,37 @@ public class ConditionalProcessorTests extends ESTestCase {
public String getTag() { public String getTag() {
return null; return null;
} }
}); }, relativeTimeProvider);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
//false, never call processor never increments metrics
String falseValue = "falsy"; String falseValue = "falsy";
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue(conditionalField, falseValue);
processor.execute(ingestDocument); processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 0, 0, 0);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
ingestDocument.setFieldValue("error", true);
processor.execute(ingestDocument);
assertStats(processor, 0, 0, 0);
//true, always call processor and increments metrics
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
assertStats(processor, 1, 0, 1);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue);
ingestDocument.setFieldValue("error", true);
IngestDocument finalIngestDocument = ingestDocument;
expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument));
assertStats(processor, 2, 1, 2);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -141,5 +169,14 @@ public class ConditionalProcessorTests extends ESTestCase {
Exception e = expectedException.get(); Exception e = expectedException.get();
assertThat(e, instanceOf(UnsupportedOperationException.class)); assertThat(e, instanceOf(UnsupportedOperationException.class));
assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage());
assertStats(processor, 0, 0, 0);
}
private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) {
IngestStats.Stats stats = conditionalProcessor.getMetric().createStats();
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(0L));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time));
} }
} }

View File

@ -63,6 +63,7 @@ import java.util.function.Consumer;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -746,16 +747,23 @@ public class IngestServiceTests extends ESTestCase {
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(null);
} }
public void testStats() { public void testStats() throws Exception {
final Processor processor = mock(Processor.class); final Processor processor = mock(Processor.class);
IngestService ingestService = createWithProcessors(Collections.singletonMap( final Processor processorFailure = mock(Processor.class);
"mock", (factories, tag, config) -> processor)); when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
when(processorFailure.getType()).thenReturn("failure-mock");
//avoid returning null and dropping the document
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error"));
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor);
map.put("failure-mock", (factories, tag, config) -> processorFailure);
IngestService ingestService = createWithProcessors(map);
final IngestStats initialStats = ingestService.stats(); final IngestStats initialStats = ingestService.stats();
assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); assertThat(initialStats.getPipelineStats().size(), equalTo(0));
assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); assertStats(initialStats.getTotalStats(), 0, 0, 0);
assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
PutPipelineRequest putRequest = new PutPipelineRequest("_id1", PutPipelineRequest putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
@ -769,7 +777,6 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class); @SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
@ -778,18 +785,33 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats(); final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2));
assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag"));
assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag"));
//total
assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0);
//pipeline
assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0);
//processor
assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0);
indexRequest.setPipeline("_id2"); indexRequest.setPipeline("_id2");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterSecondRequestStats = ingestService.stats(); final IngestStats afterSecondRequestStats = ingestService.stats();
assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2));
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); //total
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0);
assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); //pipeline
assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//processor
assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0);
//update cluster state and ensure that new stats are added to old stats //update cluster state and ensure that new stats are added to old stats
putRequest = new PutPipelineRequest("_id1", putRequest = new PutPipelineRequest("_id1",
@ -800,13 +822,66 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.setPipeline("_id1"); indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterThirdRequestStats = ingestService.stats(); final IngestStats afterThirdRequestStats = ingestService.stats();
assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2));
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); //total
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0);
assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); //pipeline
assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0);
assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is
//due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each
//processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases,
//like this one it may not readily obvious why the metrics were not carried forward.
assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0);
//test a failure, and that the processor stats are added from the old stats
putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"),
XContentType.JSON);
previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterForthRequestStats = ingestService.stats();
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
//total
assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0);
//pipeline
assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0);
assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//processor
assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed
assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats
assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0);
} }
public void testStatName(){
Processor processor = mock(Processor.class);
String name = randomAlphaOfLength(10);
when(processor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(processor), equalTo(name));
String tag = randomAlphaOfLength(10);
when(processor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
when(conditionalProcessor.getProcessor()).thenReturn(processor);
assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));
when(pipelineProcessor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag));
}
public void testExecuteWithDrop() { public void testExecuteWithDrop() {
Map<String, Processor.Factory> factories = new HashMap<>(); Map<String, Processor.Factory> factories = new HashMap<>();
factories.put("drop", new DropProcessor.Factory()); factories.put("drop", new DropProcessor.Factory());
@ -935,4 +1010,23 @@ public class IngestServiceTests extends ESTestCase {
return false; return false;
} }
} }
private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) {
assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time);
}
private void assertPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String pipelineId, long count, long failed, long time) {
assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time);
}
private void assertStats(IngestStats.Stats stats, long count, long failed, long time) {
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(0L));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time));
}
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
} }

View File

@ -19,44 +19,75 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class IngestStatsTests extends ESTestCase { public class IngestStatsTests extends ESTestCase {
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30); //total
IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300); IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300);
IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo)); //pipeline
IngestStats serialize = serialize(ingestStats); IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3));
assertNotSame(serialize, ingestStats); IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297));
assertNotSame(serialize.getTotalStats(), total); IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0));
assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount()); List<IngestStats.PipelineStat> pipelineStats =
assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount()); Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList());
assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis()); //processor
assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent()); IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
Map<String, List<IngestStats.ProcessorStat>> processorStats = MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
.put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat))
.map();
assertEquals(ingestStats.getStatsPerPipeline().size(), 1); IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats);
assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo"));
Map<String, IngestStats.Stats> left = ingestStats.getStatsPerPipeline(); IngestStats serializedStats = serialize(ingestStats);
Map<String, IngestStats.Stats> right = serialize.getStatsPerPipeline(); assertNotSame(ingestStats, serializedStats);
assertNotSame(totalStats, serializedStats.getTotalStats());
assertNotSame(pipelineStats, serializedStats.getPipelineStats());
assertNotSame(processorStats, serializedStats.getProcessorStats());
assertEquals(right.size(), 1); assertStats(totalStats, serializedStats.getTotalStats());
assertTrue(right.containsKey("foo")); assertEquals(serializedStats.getPipelineStats().size(), 3);
assertEquals(left.size(), 1);
assertTrue(left.containsKey("foo")); for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) {
IngestStats.Stats leftStats = left.get("foo"); assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats());
IngestStats.Stats rightStats = right.get("foo"); List<IngestStats.ProcessorStat> serializedProcessorStats =
assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount()); serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId());
assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount()); List<IngestStats.ProcessorStat> processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId());
assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis()); if(processorStat != null) {
assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent()); Iterator<IngestStats.ProcessorStat> it = processorStat.iterator();
//intentionally enforcing the identical ordering
for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) {
IngestStats.ProcessorStat ps = it.next();
assertEquals(ps.getName(), serializedProcessorStat.getName());
assertStats(ps.getStats(), serializedProcessorStat.getStats());
}
assertFalse(it.hasNext());
}
}
}
private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) {
assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount());
assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount());
assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis());
assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent());
} }
private IngestStats serialize(IngestStats stats) throws IOException { private IngestStats serialize(IngestStats stats) throws IOException {
@ -65,4 +96,8 @@ public class IngestStatsTests extends ESTestCase {
StreamInput in = out.bytes().streamInput(); StreamInput in = out.bytes().streamInput();
return new IngestStats(in); return new IngestStats(in);
} }
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
} }

View File

@ -21,12 +21,13 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.time.Clock;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -143,15 +144,15 @@ public class PipelineProcessorTests extends ESTestCase {
pipeline2ProcessorConfig.put("pipeline", pipeline3Id); pipeline2ProcessorConfig.put("pipeline", pipeline3Id);
PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig);
Clock clock = mock(Clock.class); LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(clock.millis()).thenReturn(0L).thenReturn(0L); when(relativeTimeProvider.getAsLong()).thenReturn(0L);
Pipeline pipeline1 = new Pipeline( Pipeline pipeline1 = new Pipeline(
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider
); );
String key1 = randomAlphaOfLength(10); String key1 = randomAlphaOfLength(10);
clock = mock(Clock.class); relativeTimeProvider = mock(LongSupplier.class);
when(clock.millis()).thenReturn(0L).thenReturn(3L); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3));
Pipeline pipeline2 = new Pipeline( Pipeline pipeline2 = new Pipeline(
pipeline2Id, null, null, new CompoundProcessor(true, pipeline2Id, null, null, new CompoundProcessor(true,
Arrays.asList( Arrays.asList(
@ -160,15 +161,15 @@ public class PipelineProcessorTests extends ESTestCase {
}), }),
pipeline2Processor), pipeline2Processor),
Collections.emptyList()), Collections.emptyList()),
clock relativeTimeProvider
); );
clock = mock(Clock.class); relativeTimeProvider = mock(LongSupplier.class);
when(clock.millis()).thenReturn(0L).thenReturn(2L); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2));
Pipeline pipeline3 = new Pipeline( Pipeline pipeline3 = new Pipeline(
pipeline3Id, null, null, new CompoundProcessor( pipeline3Id, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> { new TestProcessor(ingestDocument -> {
throw new RuntimeException("error"); throw new RuntimeException("error");
})), clock })), relativeTimeProvider
); );
when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);