Revert "ingest: processor stats (#34202)"

This reverts commit 6567729600.
This commit is contained in:
Jason Tedor 2018-10-21 13:16:15 -04:00
parent bf5f0af491
commit 0577703183
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
12 changed files with 146 additions and 652 deletions

View File

@ -20,15 +20,12 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -43,33 +40,16 @@ public class CompoundProcessor implements Processor {
private final boolean ignoreFailure; private final boolean ignoreFailure;
private final List<Processor> processors; private final List<Processor> processors;
private final List<Processor> onFailureProcessors; private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
}
public CompoundProcessor(Processor... processor) { public CompoundProcessor(Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList()); this(false, Arrays.asList(processor), Collections.emptyList());
} }
public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) { public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
}
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
LongSupplier relativeTimeProvider) {
super(); super();
this.ignoreFailure = ignoreFailure; this.ignoreFailure = ignoreFailure;
this.processors = processors; this.processors = processors;
this.onFailureProcessors = onFailureProcessors; this.onFailureProcessors = onFailureProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
}
List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
return processorsWithMetrics;
} }
public boolean isIgnoreFailure() { public boolean isIgnoreFailure() {
@ -114,17 +94,12 @@ public class CompoundProcessor implements Processor {
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) { for (Processor processor : processors) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try { try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) { if (processor.execute(ingestDocument) == null) {
return null; return null;
} }
} catch (Exception e) { } catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) { if (ignoreFailure) {
continue; continue;
} }
@ -137,15 +112,11 @@ public class CompoundProcessor implements Processor {
executeOnFailure(ingestDocument, compoundProcessorException); executeOnFailure(ingestDocument, compoundProcessorException);
break; break;
} }
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
} }
} }
return ingestDocument; return ingestDocument;
} }
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try { try {
putFailureMetadata(ingestDocument, exception); putFailureMetadata(ingestDocument, exception);

View File

@ -28,8 +28,6 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
@ -44,20 +42,12 @@ public class ConditionalProcessor extends AbstractProcessor {
private final ScriptService scriptService; private final ScriptService scriptService;
private final Processor processor; private final Processor processor;
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
this(tag, script, scriptService, processor, System::nanoTime);
}
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
super(tag); super(tag);
this.condition = script; this.condition = script;
this.scriptService = scriptService; this.scriptService = scriptService;
this.processor = processor; this.processor = processor;
this.metric = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider;
} }
@Override @Override
@ -65,30 +55,11 @@ public class ConditionalProcessor extends AbstractProcessor {
IngestConditionalScript script = IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
// Only record metric if the script evaluates to true return processor.execute(ingestDocument);
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
return processor.execute(ingestDocument);
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
} }
return ingestDocument; return ingestDocument;
} }
Processor getProcessor() {
return processor;
}
IngestMetric getMetric() {
return metric;
}
@Override @Override
public String getType() { public String getType() {
return TYPE; return TYPE;

View File

@ -19,6 +19,19 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
@ -36,7 +49,6 @@ import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -49,19 +61,6 @@ import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/** /**
* Holder class for several ingest related services. * Holder class for several ingest related services.
*/ */
@ -263,59 +262,11 @@ public class IngestService implements ClusterStateApplier {
Pipeline originalPipeline = originalPipelines.get(id); Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) { if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics()); pipeline.getMetrics().add(originalPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
} }
}); });
} }
} }
/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
*/
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorMetrics) {
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
if (processor instanceof CompoundProcessor) {
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
} else {
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor) {
metric = ((ConditionalProcessor) processor).getMetric();
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
return processorMetrics;
}
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
@ -420,42 +371,11 @@ public class IngestService implements ClusterStateApplier {
} }
public IngestStats stats() { public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> {
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
});
});
return statsBuilder.build();
}
//package private for testing Map<String, IngestStats.Stats> statsPerPipeline =
static String getProcessorName(Processor processor){ pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
if(processor instanceof PipelineProcessor){ return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
sb.append(":");
sb.append(pipelineName);
}
String tag = processor.getTag();
if(tag != null && !tag.isEmpty()){
sb.append(":");
sb.append(tag);
}
return sb.toString();
} }
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception { private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {

View File

@ -27,28 +27,17 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class IngestStats implements Writeable, ToXContentFragment { public class IngestStats implements Writeable, ToXContentFragment {
private final Stats totalStats; private final Stats totalStats;
private final List<PipelineStat> pipelineStats; private final Map<String, Stats> statsPerPipeline;
private final Map<String, List<ProcessorStat>> processorStats;
/** public IngestStats(Stats totalStats, Map<String, Stats> statsPerPipeline) {
* @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats,
* and pipeline stats are logically the sum of the processor stats.
* @param pipelineStats - The stats for a given ingest pipeline.
* @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier.
*/
public IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats) {
this.totalStats = totalStats; this.totalStats = totalStats;
this.pipelineStats = pipelineStats; this.statsPerPipeline = statsPerPipeline;
this.processorStats = processorStats;
} }
/** /**
@ -57,43 +46,37 @@ public class IngestStats implements Writeable, ToXContentFragment {
public IngestStats(StreamInput in) throws IOException { public IngestStats(StreamInput in) throws IOException {
this.totalStats = new Stats(in); this.totalStats = new Stats(in);
int size = in.readVInt(); int size = in.readVInt();
this.pipelineStats = new ArrayList<>(size); this.statsPerPipeline = new HashMap<>(size);
this.processorStats = new HashMap<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
String pipelineId = in.readString(); statsPerPipeline.put(in.readString(), new Stats(in));
Stats pipelineStat = new Stats(in);
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat));
int processorsSize = in.readVInt();
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
} }
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
totalStats.writeTo(out); totalStats.writeTo(out);
out.writeVInt(pipelineStats.size()); out.writeVInt(statsPerPipeline.size());
for (PipelineStat pipelineStat : pipelineStats) { for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) {
out.writeString(pipelineStat.getPipelineId()); out.writeString(entry.getKey());
pipelineStat.getStats().writeTo(out); entry.getValue().writeTo(out);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
if(processorStatsForPipeline == null) {
out.writeVInt(0);
}else{
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
processorStat.getStats().writeTo(out);
}
}
} }
} }
/**
* @return The accumulated stats for all pipelines
*/
public Stats getTotalStats() {
return totalStats;
}
/**
* @return The stats on a per pipeline basis
*/
public Map<String, Stats> getStatsPerPipeline() {
return statsPerPipeline;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("ingest"); builder.startObject("ingest");
@ -101,21 +84,9 @@ public class IngestStats implements Writeable, ToXContentFragment {
totalStats.toXContent(builder, params); totalStats.toXContent(builder, params);
builder.endObject(); builder.endObject();
builder.startObject("pipelines"); builder.startObject("pipelines");
for (PipelineStat pipelineStat : pipelineStats) { for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) {
builder.startObject(pipelineStat.getPipelineId()); builder.startObject(entry.getKey());
pipelineStat.getStats().toXContent(builder, params); entry.getValue().toXContent(builder, params);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
builder.startArray("processors");
if (processorStatsForPipeline != null) {
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
}
}
builder.endArray();
builder.endObject(); builder.endObject();
} }
builder.endObject(); builder.endObject();
@ -123,18 +94,6 @@ public class IngestStats implements Writeable, ToXContentFragment {
return builder; return builder;
} }
public Stats getTotalStats() {
return totalStats;
}
public List<PipelineStat> getPipelineStats() {
return pipelineStats;
}
public Map<String, List<ProcessorStat>> getProcessorStats() {
return processorStats;
}
public static class Stats implements Writeable, ToXContentFragment { public static class Stats implements Writeable, ToXContentFragment {
private final long ingestCount; private final long ingestCount;
@ -175,6 +134,7 @@ public class IngestStats implements Writeable, ToXContentFragment {
} }
/** /**
*
* @return The total time spent of ingest preprocessing in millis. * @return The total time spent of ingest preprocessing in millis.
*/ */
public long getIngestTimeInMillis() { public long getIngestTimeInMillis() {
@ -204,77 +164,4 @@ public class IngestStats implements Writeable, ToXContentFragment {
return builder; return builder;
} }
} }
/**
* Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects
*/
static class Builder {
private Stats totalStats;
private List<PipelineStat> pipelineStats = new ArrayList<>();
private Map<String, List<ProcessorStat>> processorStats = new HashMap<>();
Builder addTotalMetrics(IngestMetric totalMetric) {
this.totalStats = totalMetric.createStats();
return this;
}
Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats()));
return this;
}
Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
return this;
}
IngestStats build() {
return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats),
Collections.unmodifiableMap(processorStats));
}
}
/**
* Container for pipeline stats.
*/
public static class PipelineStat {
private final String pipelineId;
private final Stats stats;
public PipelineStat(String pipelineId, Stats stats) {
this.pipelineId = pipelineId;
this.stats = stats;
}
public String getPipelineId() {
return pipelineId;
}
public Stats getStats() {
return stats;
}
}
/**
* Container for processor stats.
*/
public static class ProcessorStat {
private final String name;
private final Stats stats;
public ProcessorStat(String name, Stats stats) {
this.name = name;
this.stats = stats;
}
public String getName() {
return name;
}
public Stats getStats() {
return stats;
}
}
} }

View File

@ -22,12 +22,11 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import java.time.Clock;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
@ -48,21 +47,20 @@ public final class Pipeline {
private final Integer version; private final Integer version;
private final CompoundProcessor compoundProcessor; private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics; private final IngestMetric metrics;
private final LongSupplier relativeTimeProvider; private final Clock clock;
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this(id, description, version, compoundProcessor, System::nanoTime); this(id, description, version, compoundProcessor, Clock.systemUTC());
} }
//package private for testing //package private for testing
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) {
LongSupplier relativeTimeProvider) {
this.id = id; this.id = id;
this.description = description; this.description = description;
this.compoundProcessor = compoundProcessor; this.compoundProcessor = compoundProcessor;
this.version = version; this.version = version;
this.metrics = new IngestMetric(); this.metrics = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider; this.clock = clock;
} }
public static Pipeline create(String id, Map<String, Object> config, public static Pipeline create(String id, Map<String, Object> config,
@ -91,7 +89,7 @@ public final class Pipeline {
* Modifies the data of a document to be indexed based on the processor this pipeline holds * Modifies the data of a document to be indexed based on the processor this pipeline holds
*/ */
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
long startTimeInNanos = relativeTimeProvider.getAsLong(); long startTimeInMillis = clock.millis();
try { try {
metrics.preIngest(); metrics.preIngest();
return compoundProcessor.execute(ingestDocument); return compoundProcessor.execute(ingestDocument);
@ -99,7 +97,7 @@ public final class Pipeline {
metrics.ingestFailed(); metrics.ingestFailed();
throw e; throw e;
} finally { } finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); long ingestTimeInMillis = clock.millis() - startTimeInMillis;
metrics.postIngest(ingestTimeInMillis); metrics.postIngest(ingestTimeInMillis);
} }
} }

View File

@ -53,10 +53,6 @@ public class PipelineProcessor extends AbstractProcessor {
return TYPE; return TYPE;
} }
String getPipelineName() {
return pipelineName;
}
public static final class Factory implements Processor.Factory { public static final class Factory implements Processor.Factory {
private final IngestService ingestService; private final IngestService ingestService;

View File

@ -53,6 +53,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
public class NodeStatsTests extends ESTestCase { public class NodeStatsTests extends ESTestCase {
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
NodeStats nodeStats = createNodeStats(); NodeStats nodeStats = createNodeStats();
try (BytesStreamOutput out = new BytesStreamOutput()) { try (BytesStreamOutput out = new BytesStreamOutput()) {
@ -270,29 +271,14 @@ public class NodeStatsTests extends ESTestCase {
assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent());
assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount());
assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis());
assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size());
for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { for (Map.Entry<String, IngestStats.Stats> entry : ingestStats.getStatsPerPipeline().entrySet()) {
String pipelineId = pipelineStat.getPipelineId(); IngestStats.Stats stats = entry.getValue();
IngestStats.Stats deserializedPipelineStats = IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey());
getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount());
assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis());
assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent());
assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount());
assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount());
List<IngestStats.ProcessorStat> processorStats = ingestStats.getProcessorStats().get(pipelineId);
//intentionally validating identical order
Iterator<IngestStats.ProcessorStat> it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator();
for (IngestStats.ProcessorStat processorStat : processorStats) {
IngestStats.ProcessorStat deserializedProcessorStat = it.next();
assertEquals(processorStat.getStats().getIngestFailedCount(),
deserializedProcessorStat.getStats().getIngestFailedCount());
assertEquals(processorStat.getStats().getIngestTimeInMillis(),
deserializedProcessorStat.getStats().getIngestTimeInMillis());
assertEquals(processorStat.getStats().getIngestCurrent(),
deserializedProcessorStat.getStats().getIngestCurrent());
assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount());
}
assertFalse(it.hasNext());
} }
} }
AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats();
@ -443,24 +429,14 @@ public class NodeStatsTests extends ESTestCase {
if (frequently()) { if (frequently()) {
IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong()); randomNonNegativeLong());
int numPipelines = randomIntBetween(0, 10);
int numProcessors = randomIntBetween(0, 10);
List<IngestStats.PipelineStat> ingestPipelineStats = new ArrayList<>(numPipelines);
Map<String, List<IngestStats.ProcessorStat>> ingestProcessorStats = new HashMap<>(numPipelines);
for (int i = 0; i < numPipelines; i++) {
String pipelineId = randomAlphaOfLengthBetween(3, 10);
ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())));
List<IngestStats.ProcessorStat> processorPerPipeline = new ArrayList<>(numProcessors); int numStatsPerPipeline = randomIntBetween(0, 10);
for (int j =0; j < numProcessors;j++) { Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>();
IngestStats.Stats processorStats = new IngestStats.Stats for (int i = 0; i < numStatsPerPipeline; i++) {
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(),
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
} }
ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); ingestStats = new IngestStats(totalStats, statsPerPipeline);
} }
AdaptiveSelectionStats adaptiveSelectionStats = null; AdaptiveSelectionStats adaptiveSelectionStats = null;
if (frequently()) { if (frequently()) {
@ -489,8 +465,4 @@ public class NodeStatsTests extends ESTestCase {
fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
ingestStats, adaptiveSelectionStats); ingestStats, adaptiveSelectionStats);
} }
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
} }

View File

@ -27,17 +27,11 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CompoundProcessorTests extends ESTestCase { public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument; private IngestDocument ingestDocument;
@ -55,29 +49,18 @@ public class CompoundProcessorTests extends ESTestCase {
} }
public void testSingleProcessor() throws Exception { public void testSingleProcessor() throws Exception {
LongSupplier relativeTimeProvider = mock(LongSupplier.class); TestProcessor processor = new TestProcessor(ingestDocument -> {});
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
TestProcessor processor = new TestProcessor(ingestDocument ->{
assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0);
});
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 0, 1);
} }
public void testSingleProcessorWithException() throws Exception { public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
LongSupplier relativeTimeProvider = mock(LongSupplier.class); CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
@ -88,22 +71,15 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(e.getRootCause().getMessage(), equalTo("error")); assertThat(e.getRootCause().getMessage(), equalTo("error"));
} }
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testIgnoreFailure() throws Exception { public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
LongSupplier relativeTimeProvider = mock(LongSupplier.class); CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList());
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor =
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider);
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
assertStats(1, compoundProcessor, 0, 1, 0, 0);
assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value"));
} }
@ -117,15 +93,11 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider); Collections.singletonList(processor2));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 1);
assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1));
} }
@ -146,17 +118,14 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail),
Collections.singletonList(lastProcessor), relativeTimeProvider); Collections.singletonList(lastProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); Collections.singletonList(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
@ -168,24 +137,21 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor); CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider); Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testCompoundProcessorExceptionFail() throws Exception { public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3)); assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -194,24 +160,21 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor), relativeTimeProvider); Collections.singletonList(failProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider); Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3)); assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -220,44 +183,27 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
}); });
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); Collections.singletonList(new CompoundProcessor(failProcessor)));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), relativeTimeProvider); Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
} }
public void testBreakOnFailure() throws Exception { public void testBreakOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");}); TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {}); TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
Collections.singletonList(onFailureProcessor), relativeTimeProvider); Collections.singletonList(onFailureProcessor));
pipeline.execute(ingestDocument); pipeline.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
assertStats(pipeline, 1, 1, 0);
}
private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
assertStats(0, compoundProcessor, 0L, count, failed, time);
}
private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) {
IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats();
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(current));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), equalTo(time));
} }
} }

View File

@ -33,18 +33,12 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConditionalProcessorTests extends ESTestCase { public class ConditionalProcessorTests extends ESTestCase {
@ -66,8 +60,6 @@ public class ConditionalProcessorTests extends ESTestCase {
new HashMap<>(ScriptModule.CORE_CONTEXTS) new HashMap<>(ScriptModule.CORE_CONTEXTS)
); );
Map<String, Object> document = new HashMap<>(); Map<String, Object> document = new HashMap<>();
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2));
ConditionalProcessor processor = new ConditionalProcessor( ConditionalProcessor processor = new ConditionalProcessor(
randomAlphaOfLength(10), randomAlphaOfLength(10),
new Script( new Script(
@ -75,10 +67,7 @@ public class ConditionalProcessorTests extends ESTestCase {
scriptName, Collections.emptyMap()), scriptService, scriptName, Collections.emptyMap()), scriptService,
new Processor() { new Processor() {
@Override @Override
public IngestDocument execute(final IngestDocument ingestDocument){ public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
if(ingestDocument.hasField("error")){
throw new RuntimeException("error");
}
ingestDocument.setFieldValue("foo", "bar"); ingestDocument.setFieldValue("foo", "bar");
return ingestDocument; return ingestDocument;
} }
@ -92,37 +81,20 @@ public class ConditionalProcessorTests extends ESTestCase {
public String getTag() { public String getTag() {
return null; return null;
} }
}, relativeTimeProvider); });
//false, never call processor never increments metrics
String falseValue = "falsy";
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 0, 0, 0);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue);
ingestDocument.setFieldValue("error", true);
processor.execute(ingestDocument);
assertStats(processor, 0, 0, 0);
//true, always call processor and increments metrics
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue(conditionalField, trueValue);
processor.execute(ingestDocument); processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue));
assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar"));
assertStats(processor, 1, 0, 1);
String falseValue = "falsy";
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue(conditionalField, falseValue);
ingestDocument.setFieldValue("error", true); processor.execute(ingestDocument);
IngestDocument finalIngestDocument = ingestDocument; assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 2, 1, 2);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -169,14 +141,5 @@ public class ConditionalProcessorTests extends ESTestCase {
Exception e = expectedException.get(); Exception e = expectedException.get();
assertThat(e, instanceOf(UnsupportedOperationException.class)); assertThat(e, instanceOf(UnsupportedOperationException.class));
assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage());
assertStats(processor, 0, 0, 0);
}
private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) {
IngestStats.Stats stats = conditionalProcessor.getMetric().createStats();
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(0L));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time));
} }
} }

View File

@ -63,7 +63,6 @@ import java.util.function.Consumer;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -747,23 +746,16 @@ public class IngestServiceTests extends ESTestCase {
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(null);
} }
public void testStats() throws Exception { public void testStats() {
final Processor processor = mock(Processor.class); final Processor processor = mock(Processor.class);
final Processor processorFailure = mock(Processor.class); IngestService ingestService = createWithProcessors(Collections.singletonMap(
when(processor.getType()).thenReturn("mock"); "mock", (factories, tag, config) -> processor));
when(processor.getTag()).thenReturn("mockTag");
when(processorFailure.getType()).thenReturn("failure-mock");
//avoid returning null and dropping the document
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error"));
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor);
map.put("failure-mock", (factories, tag, config) -> processorFailure);
IngestService ingestService = createWithProcessors(map);
final IngestStats initialStats = ingestService.stats(); final IngestStats initialStats = ingestService.stats();
assertThat(initialStats.getPipelineStats().size(), equalTo(0)); assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0));
assertStats(initialStats.getTotalStats(), 0, 0, 0); assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
PutPipelineRequest putRequest = new PutPipelineRequest("_id1", PutPipelineRequest putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
@ -777,6 +769,7 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class); @SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
@ -785,33 +778,18 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats(); final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L));
afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L));
//total
assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0);
//pipeline
assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0);
//processor
assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0);
indexRequest.setPipeline("_id2"); indexRequest.setPipeline("_id2");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterSecondRequestStats = ingestService.stats(); final IngestStats afterSecondRequestStats = ingestService.stats();
assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2));
//total assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0); assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
//pipeline assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//processor
assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0);
//update cluster state and ensure that new stats are added to old stats //update cluster state and ensure that new stats are added to old stats
putRequest = new PutPipelineRequest("_id1", putRequest = new PutPipelineRequest("_id1",
@ -822,66 +800,13 @@ public class IngestServiceTests extends ESTestCase {
indexRequest.setPipeline("_id1"); indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterThirdRequestStats = ingestService.stats(); final IngestStats afterThirdRequestStats = ingestService.stats();
assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2));
//total assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L));
assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0); assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
//pipeline assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L));
assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0);
assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is
//due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each
//processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases,
//like this one it may not readily obvious why the metrics were not carried forward.
assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0);
//test a failure, and that the processor stats are added from the old stats
putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"),
XContentType.JSON);
previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterForthRequestStats = ingestService.stats();
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
//total
assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0);
//pipeline
assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0);
assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0);
//processor
assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed
assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats
assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0);
} }
public void testStatName(){
Processor processor = mock(Processor.class);
String name = randomAlphaOfLength(10);
when(processor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(processor), equalTo(name));
String tag = randomAlphaOfLength(10);
when(processor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
when(conditionalProcessor.getProcessor()).thenReturn(processor);
assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));
when(pipelineProcessor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag));
}
public void testExecuteWithDrop() { public void testExecuteWithDrop() {
Map<String, Processor.Factory> factories = new HashMap<>(); Map<String, Processor.Factory> factories = new HashMap<>();
factories.put("drop", new DropProcessor.Factory()); factories.put("drop", new DropProcessor.Factory());
@ -1010,23 +935,4 @@ public class IngestServiceTests extends ESTestCase {
return false; return false;
} }
} }
private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) {
assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time);
}
private void assertPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String pipelineId, long count, long failed, long time) {
assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time);
}
private void assertStats(IngestStats.Stats stats, long count, long failed, long time) {
assertThat(stats.getIngestCount(), equalTo(count));
assertThat(stats.getIngestCurrent(), equalTo(0L));
assertThat(stats.getIngestFailedCount(), equalTo(failed));
assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time));
}
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
} }

View File

@ -19,75 +19,44 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class IngestStatsTests extends ESTestCase { public class IngestStatsTests extends ESTestCase {
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
//total IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30);
IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300);
//pipeline IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo));
IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); IngestStats serialize = serialize(ingestStats);
IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); assertNotSame(serialize, ingestStats);
IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); assertNotSame(serialize.getTotalStats(), total);
List<IngestStats.PipelineStat> pipelineStats = assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount());
Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount());
//processor assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis());
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent());
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
Map<String, List<IngestStats.ProcessorStat>> processorStats = MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
.put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat))
.map();
IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats); assertEquals(ingestStats.getStatsPerPipeline().size(), 1);
assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo"));
IngestStats serializedStats = serialize(ingestStats); Map<String, IngestStats.Stats> left = ingestStats.getStatsPerPipeline();
assertNotSame(ingestStats, serializedStats); Map<String, IngestStats.Stats> right = serialize.getStatsPerPipeline();
assertNotSame(totalStats, serializedStats.getTotalStats());
assertNotSame(pipelineStats, serializedStats.getPipelineStats());
assertNotSame(processorStats, serializedStats.getProcessorStats());
assertStats(totalStats, serializedStats.getTotalStats()); assertEquals(right.size(), 1);
assertEquals(serializedStats.getPipelineStats().size(), 3); assertTrue(right.containsKey("foo"));
assertEquals(left.size(), 1);
for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { assertTrue(left.containsKey("foo"));
assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats()); IngestStats.Stats leftStats = left.get("foo");
List<IngestStats.ProcessorStat> serializedProcessorStats = IngestStats.Stats rightStats = right.get("foo");
serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount());
List<IngestStats.ProcessorStat> processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount());
if(processorStat != null) { assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis());
Iterator<IngestStats.ProcessorStat> it = processorStat.iterator(); assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent());
//intentionally enforcing the identical ordering
for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) {
IngestStats.ProcessorStat ps = it.next();
assertEquals(ps.getName(), serializedProcessorStat.getName());
assertStats(ps.getStats(), serializedProcessorStat.getStats());
}
assertFalse(it.hasNext());
}
}
}
private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) {
assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount());
assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount());
assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis());
assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent());
} }
private IngestStats serialize(IngestStats stats) throws IOException { private IngestStats serialize(IngestStats stats) throws IOException {
@ -96,8 +65,4 @@ public class IngestStatsTests extends ESTestCase {
StreamInput in = out.bytes().streamInput(); StreamInput in = out.bytes().streamInput();
return new IngestStats(in); return new IngestStats(in);
} }
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
} }

View File

@ -21,13 +21,12 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.time.Clock;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -144,15 +143,15 @@ public class PipelineProcessorTests extends ESTestCase {
pipeline2ProcessorConfig.put("pipeline", pipeline3Id); pipeline2ProcessorConfig.put("pipeline", pipeline3Id);
PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig);
LongSupplier relativeTimeProvider = mock(LongSupplier.class); Clock clock = mock(Clock.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L); when(clock.millis()).thenReturn(0L).thenReturn(0L);
Pipeline pipeline1 = new Pipeline( Pipeline pipeline1 = new Pipeline(
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock
); );
String key1 = randomAlphaOfLength(10); String key1 = randomAlphaOfLength(10);
relativeTimeProvider = mock(LongSupplier.class); clock = mock(Clock.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3)); when(clock.millis()).thenReturn(0L).thenReturn(3L);
Pipeline pipeline2 = new Pipeline( Pipeline pipeline2 = new Pipeline(
pipeline2Id, null, null, new CompoundProcessor(true, pipeline2Id, null, null, new CompoundProcessor(true,
Arrays.asList( Arrays.asList(
@ -161,15 +160,15 @@ public class PipelineProcessorTests extends ESTestCase {
}), }),
pipeline2Processor), pipeline2Processor),
Collections.emptyList()), Collections.emptyList()),
relativeTimeProvider clock
); );
relativeTimeProvider = mock(LongSupplier.class); clock = mock(Clock.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); when(clock.millis()).thenReturn(0L).thenReturn(2L);
Pipeline pipeline3 = new Pipeline( Pipeline pipeline3 = new Pipeline(
pipeline3Id, null, null, new CompoundProcessor( pipeline3Id, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> { new TestProcessor(ingestDocument -> {
throw new RuntimeException("error"); throw new RuntimeException("error");
})), relativeTimeProvider })), clock
); );
when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);