ingest: correctly measure chained pipeline stats (#33912)
Prior to this change when a pipeline processor called another pipeline, only the stats for the first processor were recorded. The stats for the subsequent pipelines were ignored. This change properly accounts for pipelines irregardless if they are the first or subsequently called pipelines. This change moves the state of the stats from the IngestService to the pipeline itself. Cluster updates are safe since the pipelines map is atomically swapped, and if a cluster update happens while iterating over stats (now read directly from the pipeline) a slightly stale view of stats may be shown.
This commit is contained in:
parent
37be3e713c
commit
73ee721b29
|
@ -0,0 +1,95 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
|
import org.elasticsearch.common.metrics.MeanMetric;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Metrics to measure ingest actions.
|
||||||
|
* <p>This counts measure documents and timings for a given scope.
|
||||||
|
* The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline,
|
||||||
|
* or you can use this class to count documents for a given pipeline or a specific processor.
|
||||||
|
* This class does not make assumptions about it's given scope.
|
||||||
|
*/
|
||||||
|
class IngestMetric {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The time it takes to complete the measured item.
|
||||||
|
*/
|
||||||
|
private final MeanMetric ingestTime = new MeanMetric();
|
||||||
|
/**
|
||||||
|
* The current count of things being measure. Should most likely ever be 0 or 1.
|
||||||
|
* Useful when aggregating multiple metrics to see how many things are in flight.
|
||||||
|
*/
|
||||||
|
private final CounterMetric ingestCurrent = new CounterMetric();
|
||||||
|
/**
|
||||||
|
* The ever increasing count of things being measured
|
||||||
|
*/
|
||||||
|
private final CounterMetric ingestCount = new CounterMetric();
|
||||||
|
/**
|
||||||
|
* The only increasing count of failures
|
||||||
|
*/
|
||||||
|
private final CounterMetric ingestFailed = new CounterMetric();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this prior to the ingest action.
|
||||||
|
*/
|
||||||
|
void preIngest() {
|
||||||
|
ingestCurrent.inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this after the performing the ingest action, even if the action failed.
|
||||||
|
* @param ingestTimeInMillis The time it took to perform the action.
|
||||||
|
*/
|
||||||
|
void postIngest(long ingestTimeInMillis) {
|
||||||
|
ingestCurrent.dec();
|
||||||
|
ingestTime.inc(ingestTimeInMillis);
|
||||||
|
ingestCount.inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this if the ingest action failed.
|
||||||
|
*/
|
||||||
|
void ingestFailed() {
|
||||||
|
ingestFailed.inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Add two sets of metrics together.
|
||||||
|
* <p><strong>Note -</strong> this method does <strong>not</strong> add the current count values.
|
||||||
|
* The current count value is ephemeral and requires a increase/decrease operation pairs to keep the value correct.
|
||||||
|
*
|
||||||
|
* @param metrics The metric to add.
|
||||||
|
*/
|
||||||
|
void add(IngestMetric metrics) {
|
||||||
|
ingestCount.inc(metrics.ingestCount.count());
|
||||||
|
ingestTime.inc(metrics.ingestTime.sum());
|
||||||
|
ingestFailed.inc(metrics.ingestFailed.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a serializable representation for these metrics.
|
||||||
|
*/
|
||||||
|
IngestStats.Stats createStats() {
|
||||||
|
return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.count(), ingestFailed.count());
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,15 +23,15 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
import org.elasticsearch.ResourceNotFoundException;
|
||||||
|
@ -49,8 +49,6 @@ import org.elasticsearch.cluster.ClusterStateApplier;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.metrics.CounterMetric;
|
|
||||||
import org.elasticsearch.common.metrics.MeanMetric;
|
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
@ -79,8 +77,7 @@ public class IngestService implements ClusterStateApplier {
|
||||||
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
|
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
|
||||||
private volatile Map<String, Pipeline> pipelines = new HashMap<>();
|
private volatile Map<String, Pipeline> pipelines = new HashMap<>();
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final StatsHolder totalStats = new StatsHolder();
|
private final IngestMetric totalMetrics = new IngestMetric();
|
||||||
private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();
|
|
||||||
|
|
||||||
public IngestService(ClusterService clusterService, ThreadPool threadPool,
|
public IngestService(ClusterService clusterService, ThreadPool threadPool,
|
||||||
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
|
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
|
||||||
|
@ -257,10 +254,16 @@ public class IngestService implements ClusterStateApplier {
|
||||||
@Override
|
@Override
|
||||||
public void applyClusterState(final ClusterChangedEvent event) {
|
public void applyClusterState(final ClusterChangedEvent event) {
|
||||||
ClusterState state = event.state();
|
ClusterState state = event.state();
|
||||||
|
Map<String, Pipeline> originalPipelines = pipelines;
|
||||||
innerUpdatePipelines(event.previousState(), state);
|
innerUpdatePipelines(event.previousState(), state);
|
||||||
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
|
//pipelines changed, so add the old metrics to the new metrics
|
||||||
if (ingestMetadata != null) {
|
if (originalPipelines != pipelines) {
|
||||||
updatePipelineStats(ingestMetadata);
|
pipelines.forEach((id, pipeline) -> {
|
||||||
|
Pipeline originalPipeline = originalPipelines.get(id);
|
||||||
|
if (originalPipeline != null) {
|
||||||
|
pipeline.getMetrics().add(originalPipeline.getMetrics());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,6 +328,7 @@ public class IngestService implements ClusterStateApplier {
|
||||||
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
|
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
|
||||||
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
|
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
|
||||||
Consumer<IndexRequest> itemDroppedHandler) {
|
Consumer<IndexRequest> itemDroppedHandler) {
|
||||||
|
|
||||||
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
|
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -367,37 +371,11 @@ public class IngestService implements ClusterStateApplier {
|
||||||
}
|
}
|
||||||
|
|
||||||
public IngestStats stats() {
|
public IngestStats stats() {
|
||||||
Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;
|
|
||||||
|
|
||||||
Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
|
Map<String, IngestStats.Stats> statsPerPipeline =
|
||||||
for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
|
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));
|
||||||
statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
|
|
||||||
}
|
|
||||||
|
|
||||||
return new IngestStats(totalStats.createStats(), statsPerPipeline);
|
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
|
||||||
}
|
|
||||||
|
|
||||||
void updatePipelineStats(IngestMetadata ingestMetadata) {
|
|
||||||
boolean changed = false;
|
|
||||||
Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
|
|
||||||
Iterator<String> iterator = newStatsPerPipeline.keySet().iterator();
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
String pipeline = iterator.next();
|
|
||||||
if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
|
|
||||||
iterator.remove();
|
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (String pipeline : ingestMetadata.getPipelines().keySet()) {
|
|
||||||
if (newStatsPerPipeline.containsKey(pipeline) == false) {
|
|
||||||
newStatsPerPipeline.put(pipeline, new StatsHolder());
|
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (changed) {
|
|
||||||
statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
|
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
|
||||||
|
@ -408,10 +386,8 @@ public class IngestService implements ClusterStateApplier {
|
||||||
long startTimeInNanos = System.nanoTime();
|
long startTimeInNanos = System.nanoTime();
|
||||||
// the pipeline specific stat holder may not exist and that is fine:
|
// the pipeline specific stat holder may not exist and that is fine:
|
||||||
// (e.g. the pipeline may have been removed while we're ingesting a document
|
// (e.g. the pipeline may have been removed while we're ingesting a document
|
||||||
Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
|
|
||||||
try {
|
try {
|
||||||
totalStats.preIngest();
|
totalMetrics.preIngest();
|
||||||
pipelineStats.ifPresent(StatsHolder::preIngest);
|
|
||||||
String index = indexRequest.index();
|
String index = indexRequest.index();
|
||||||
String type = indexRequest.type();
|
String type = indexRequest.type();
|
||||||
String id = indexRequest.id();
|
String id = indexRequest.id();
|
||||||
|
@ -437,13 +413,11 @@ public class IngestService implements ClusterStateApplier {
|
||||||
indexRequest.source(ingestDocument.getSourceAndMetadata());
|
indexRequest.source(ingestDocument.getSourceAndMetadata());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
totalStats.ingestFailed();
|
totalMetrics.ingestFailed();
|
||||||
pipelineStats.ifPresent(StatsHolder::ingestFailed);
|
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
|
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
|
||||||
totalStats.postIngest(ingestTimeInMillis);
|
totalMetrics.postIngest(ingestTimeInMillis);
|
||||||
pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,27 +454,4 @@ public class IngestService implements ClusterStateApplier {
|
||||||
ExceptionsHelper.rethrowAndSuppress(exceptions);
|
ExceptionsHelper.rethrowAndSuppress(exceptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StatsHolder {
|
|
||||||
|
|
||||||
private final MeanMetric ingestMetric = new MeanMetric();
|
|
||||||
private final CounterMetric ingestCurrent = new CounterMetric();
|
|
||||||
private final CounterMetric ingestFailed = new CounterMetric();
|
|
||||||
|
|
||||||
void preIngest() {
|
|
||||||
ingestCurrent.inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
void postIngest(long ingestTimeInMillis) {
|
|
||||||
ingestCurrent.dec();
|
|
||||||
ingestMetric.inc(ingestTimeInMillis);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ingestFailed() {
|
|
||||||
ingestFailed.inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
IngestStats.Stats createStats() {
|
|
||||||
return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,12 @@ package org.elasticsearch.ingest;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
|
||||||
|
import java.time.Clock;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,12 +46,21 @@ public final class Pipeline {
|
||||||
@Nullable
|
@Nullable
|
||||||
private final Integer version;
|
private final Integer version;
|
||||||
private final CompoundProcessor compoundProcessor;
|
private final CompoundProcessor compoundProcessor;
|
||||||
|
private final IngestMetric metrics;
|
||||||
|
private final Clock clock;
|
||||||
|
|
||||||
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
|
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
|
||||||
|
this(id, description, version, compoundProcessor, Clock.systemUTC());
|
||||||
|
}
|
||||||
|
|
||||||
|
//package private for testing
|
||||||
|
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.description = description;
|
this.description = description;
|
||||||
this.compoundProcessor = compoundProcessor;
|
this.compoundProcessor = compoundProcessor;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
|
this.metrics = new IngestMetric();
|
||||||
|
this.clock = clock;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Pipeline create(String id, Map<String, Object> config,
|
public static Pipeline create(String id, Map<String, Object> config,
|
||||||
|
@ -78,7 +89,17 @@ public final class Pipeline {
|
||||||
* Modifies the data of a document to be indexed based on the processor this pipeline holds
|
* Modifies the data of a document to be indexed based on the processor this pipeline holds
|
||||||
*/
|
*/
|
||||||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
||||||
|
long startTimeInMillis = clock.millis();
|
||||||
|
try {
|
||||||
|
metrics.preIngest();
|
||||||
return compoundProcessor.execute(ingestDocument);
|
return compoundProcessor.execute(ingestDocument);
|
||||||
|
} catch (Exception e) {
|
||||||
|
metrics.ingestFailed();
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
long ingestTimeInMillis = clock.millis() - startTimeInMillis;
|
||||||
|
metrics.postIngest(ingestTimeInMillis);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -136,4 +157,10 @@ public final class Pipeline {
|
||||||
return compoundProcessor.flattenProcessors();
|
return compoundProcessor.flattenProcessors();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The metrics associated with this pipeline.
|
||||||
|
*/
|
||||||
|
public IngestMetric getMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,16 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.function.BiConsumer;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
|
@ -59,13 +49,22 @@ import org.hamcrest.CustomTypeSafeMatcher;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.hasKey;
|
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.not;
|
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.hamcrest.Matchers.sameInstance;
|
import static org.hamcrest.Matchers.sameInstance;
|
||||||
|
@ -769,16 +768,14 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
previousClusterState = clusterState;
|
previousClusterState = clusterState;
|
||||||
clusterState = IngestService.innerPut(putRequest, clusterState);
|
clusterState = IngestService.innerPut(putRequest, clusterState);
|
||||||
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
||||||
final Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
|
|
||||||
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
|
|
||||||
configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
|
|
||||||
ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
||||||
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
|
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
|
||||||
|
|
||||||
final IndexRequest indexRequest = new IndexRequest("_index");
|
final IndexRequest indexRequest = new IndexRequest("_index");
|
||||||
indexRequest.setPipeline("_id1");
|
indexRequest.setPipeline("_id1");
|
||||||
|
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||||
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||||
final IngestStats afterFirstRequestStats = ingestService.stats();
|
final IngestStats afterFirstRequestStats = ingestService.stats();
|
||||||
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
|
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
|
||||||
|
@ -793,23 +790,21 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
|
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
|
||||||
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
|
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
|
||||||
assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
|
assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
|
||||||
}
|
|
||||||
|
|
||||||
// issue: https://github.com/elastic/elasticsearch/issues/18126
|
//update cluster state and ensure that new stats are added to old stats
|
||||||
public void testUpdatingStatsWhenRemovingPipelineWorks() {
|
putRequest = new PutPipelineRequest("_id1",
|
||||||
IngestService ingestService = createWithProcessors();
|
new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON);
|
||||||
Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
|
previousClusterState = clusterState;
|
||||||
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
|
clusterState = IngestService.innerPut(putRequest, clusterState);
|
||||||
configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
||||||
ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
|
indexRequest.setPipeline("_id1");
|
||||||
assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1"));
|
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||||
assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2"));
|
final IngestStats afterThirdRequestStats = ingestService.stats();
|
||||||
|
assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2));
|
||||||
|
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L));
|
||||||
|
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
|
||||||
|
assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L));
|
||||||
|
|
||||||
configurationMap = new HashMap<>();
|
|
||||||
configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON));
|
|
||||||
ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
|
|
||||||
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1")));
|
|
||||||
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecuteWithDrop() {
|
public void testExecuteWithDrop() {
|
||||||
|
|
|
@ -18,20 +18,17 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.time.Clock;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.ingest.CompoundProcessor;
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
|
||||||
import org.elasticsearch.ingest.IngestService;
|
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
|
||||||
import org.elasticsearch.ingest.PipelineProcessor;
|
|
||||||
import org.elasticsearch.ingest.Processor;
|
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -130,4 +127,81 @@ public class PipelineProcessorTests extends ESTestCase {
|
||||||
outerProc.execute(testIngestDocument);
|
outerProc.execute(testIngestDocument);
|
||||||
outerProc.execute(testIngestDocument);
|
outerProc.execute(testIngestDocument);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPipelineProcessorWithPipelineChain() throws Exception {
|
||||||
|
String pipeline1Id = "pipeline1";
|
||||||
|
String pipeline2Id = "pipeline2";
|
||||||
|
String pipeline3Id = "pipeline3";
|
||||||
|
IngestService ingestService = mock(IngestService.class);
|
||||||
|
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
|
||||||
|
|
||||||
|
Map<String, Object> pipeline1ProcessorConfig = new HashMap<>();
|
||||||
|
pipeline1ProcessorConfig.put("pipeline", pipeline2Id);
|
||||||
|
PipelineProcessor pipeline1Processor = factory.create(Collections.emptyMap(), null, pipeline1ProcessorConfig);
|
||||||
|
|
||||||
|
Map<String, Object> pipeline2ProcessorConfig = new HashMap<>();
|
||||||
|
pipeline2ProcessorConfig.put("pipeline", pipeline3Id);
|
||||||
|
PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig);
|
||||||
|
|
||||||
|
Clock clock = mock(Clock.class);
|
||||||
|
when(clock.millis()).thenReturn(0L).thenReturn(0L);
|
||||||
|
Pipeline pipeline1 = new Pipeline(
|
||||||
|
pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock
|
||||||
|
);
|
||||||
|
|
||||||
|
String key1 = randomAlphaOfLength(10);
|
||||||
|
clock = mock(Clock.class);
|
||||||
|
when(clock.millis()).thenReturn(0L).thenReturn(3L);
|
||||||
|
Pipeline pipeline2 = new Pipeline(
|
||||||
|
pipeline2Id, null, null, new CompoundProcessor(true,
|
||||||
|
Arrays.asList(
|
||||||
|
new TestProcessor(ingestDocument -> {
|
||||||
|
ingestDocument.setFieldValue(key1, randomInt());
|
||||||
|
}),
|
||||||
|
pipeline2Processor),
|
||||||
|
Collections.emptyList()),
|
||||||
|
clock
|
||||||
|
);
|
||||||
|
clock = mock(Clock.class);
|
||||||
|
when(clock.millis()).thenReturn(0L).thenReturn(2L);
|
||||||
|
Pipeline pipeline3 = new Pipeline(
|
||||||
|
pipeline3Id, null, null, new CompoundProcessor(
|
||||||
|
new TestProcessor(ingestDocument -> {
|
||||||
|
throw new RuntimeException("error");
|
||||||
|
})), clock
|
||||||
|
);
|
||||||
|
when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
|
||||||
|
when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);
|
||||||
|
when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3);
|
||||||
|
|
||||||
|
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
|
//start the chain
|
||||||
|
ingestDocument.executePipeline(pipeline1);
|
||||||
|
assertNotNull(ingestDocument.getSourceAndMetadata().get(key1));
|
||||||
|
|
||||||
|
//check the stats
|
||||||
|
IngestStats.Stats pipeline1Stats = pipeline1.getMetrics().createStats();
|
||||||
|
IngestStats.Stats pipeline2Stats = pipeline2.getMetrics().createStats();
|
||||||
|
IngestStats.Stats pipeline3Stats = pipeline3.getMetrics().createStats();
|
||||||
|
|
||||||
|
//current
|
||||||
|
assertThat(pipeline1Stats.getIngestCurrent(), equalTo(0L));
|
||||||
|
assertThat(pipeline2Stats.getIngestCurrent(), equalTo(0L));
|
||||||
|
assertThat(pipeline3Stats.getIngestCurrent(), equalTo(0L));
|
||||||
|
|
||||||
|
//count
|
||||||
|
assertThat(pipeline1Stats.getIngestCount(), equalTo(1L));
|
||||||
|
assertThat(pipeline2Stats.getIngestCount(), equalTo(1L));
|
||||||
|
assertThat(pipeline3Stats.getIngestCount(), equalTo(1L));
|
||||||
|
|
||||||
|
//time
|
||||||
|
assertThat(pipeline1Stats.getIngestTimeInMillis(), equalTo(0L));
|
||||||
|
assertThat(pipeline2Stats.getIngestTimeInMillis(), equalTo(3L));
|
||||||
|
assertThat(pipeline3Stats.getIngestTimeInMillis(), equalTo(2L));
|
||||||
|
|
||||||
|
//failure
|
||||||
|
assertThat(pipeline1Stats.getIngestFailedCount(), equalTo(0L));
|
||||||
|
assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
|
||||||
|
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue