Improve how internal representation of pipelines are updated (#42257)

If a single pipeline is updated then the internal representation of
all pipelines was updated. With this change, only the internal representation
of the pipelines that have been modified will be updated.

Prior to this change the IngestMetadata of the previous and current cluster
was used to determine whether the internal representation of pipelines
should be updated. If applying the previous cluster state change failed then
subsequent cluster state changes that have no changes to IngestMetadata
will not attempt to update the internal representation of the pipelines.

This commit, changes how the IngestService updates the internal representation
by keeping track of the underlying configuration and use that to detect
against the new IngestMetadata whether a pipeline configuration has been
changed and if so, then the internal pipeline representation will be updated.
This commit is contained in:
Martijn van Groningen 2019-05-27 09:21:30 +02:00
parent 4dbf6c0df9
commit 48a71459c0
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
2 changed files with 223 additions and 87 deletions

View File

@ -80,7 +80,7 @@ public class IngestService implements ClusterStateApplier {
// We know of all the processor factories when a node with all its plugin have been initialized. Also some // We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes // processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, Pipeline> pipelines = new HashMap<>(); private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final IngestMetric totalMetrics = new IngestMetric(); private final IngestMetric totalMetrics = new IngestMetric();
@ -236,7 +236,12 @@ public class IngestService implements ClusterStateApplier {
* Returns the pipeline by the specified id * Returns the pipeline by the specified id
*/ */
public Pipeline getPipeline(String id) { public Pipeline getPipeline(String id) {
return pipelines.get(id); PipelineHolder holder = pipelines.get(id);
if (holder != null) {
return holder.pipeline;
} else {
return null;
}
} }
public Map<String, Processor.Factory> getProcessorFactories() { public Map<String, Processor.Factory> getProcessorFactories() {
@ -252,52 +257,10 @@ public class IngestService implements ClusterStateApplier {
return new IngestInfo(processorInfoList); return new IngestInfo(processorInfoList);
} }
Map<String, Pipeline> pipelines() { Map<String, PipelineHolder> pipelines() {
return pipelines; return pipelines;
} }
@Override
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
Map<String, Pipeline> originalPipelines = pipelines;
try {
innerUpdatePipelines(event.previousState(), state);
} catch (ElasticsearchParseException e) {
logger.warn("failed to update ingest pipelines", e);
}
//pipelines changed, so add the old metrics to the new metrics
if (originalPipelines != pipelines) {
pipelines.forEach((id, pipeline) -> {
Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
}
});
}
}
/** /**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
@ -324,26 +287,7 @@ public class IngestService implements ClusterStateApplier {
return processorMetrics; return processorMetrics;
} }
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
Processor failureProcessor = new AbstractProcessor(tag) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
throw new IllegalStateException(errorMessage);
}
@Override
public String getType() {
return type;
}
};
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
}
public static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
Map<String, PipelineConfiguration> pipelines; Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) { if (currentIngestMetadata != null) {
@ -403,10 +347,11 @@ public class IngestService implements ClusterStateApplier {
String pipelineId = indexRequest.getPipeline(); String pipelineId = indexRequest.getPipeline();
if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
try { try {
Pipeline pipeline = pipelines.get(pipelineId); PipelineHolder holder = pipelines.get(pipelineId);
if (pipeline == null) { if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
} }
Pipeline pipeline = holder.pipeline;
innerExecute(indexRequest, pipeline, itemDroppedHandler); innerExecute(indexRequest, pipeline, itemDroppedHandler);
//this shouldn't be needed here but we do it for consistency with index api //this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution // which requires it to prevent double execution
@ -424,7 +369,8 @@ public class IngestService implements ClusterStateApplier {
public IngestStats stats() { public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder(); IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics); statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> { pipelines.forEach((id, holder) -> {
Pipeline pipeline = holder.pipeline;
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>(); List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
@ -503,37 +449,146 @@ public class IngestService implements ClusterStateApplier {
} }
} }
private void innerUpdatePipelines(ClusterState previousState, ClusterState state) { @Override
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return; return;
} }
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); if (newIngestMetadata == null) {
if (Objects.equals(ingestMetadata, previousIngestMetadata)) {
return; return;
} }
Map<String, Pipeline> pipelines = new HashMap<>(); try {
List<ElasticsearchParseException> exceptions = new ArrayList<>(); innerUpdatePipelines(newIngestMetadata);
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { } catch (ElasticsearchParseException e) {
logger.warn("failed to update ingest pipelines", e);
}
}
void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
Map<String, PipelineHolder> existingPipelines = this.pipelines;
// Lazy initialize these variables in order to favour the most like scenario that there are no pipeline changes:
Map<String, PipelineHolder> newPipelines = null;
List<ElasticsearchParseException> exceptions = null;
// Iterate over pipeline configurations in ingest metadata and constructs a new pipeline if there is no pipeline
// or the pipeline configuration has been modified
for (PipelineConfiguration newConfiguration : newIngestMetadata.getPipelines().values()) {
PipelineHolder previous = existingPipelines.get(newConfiguration.getId());
if (previous != null && previous.configuration.equals(newConfiguration)) {
continue;
}
if (newPipelines == null) {
newPipelines = new HashMap<>(existingPipelines);
}
try { try {
pipelines.put( Pipeline newPipeline =
pipeline.getId(), Pipeline.create(newConfiguration.getId(), newConfiguration.getConfigAsMap(), processorFactories, scriptService);
Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories, scriptService) newPipelines.put(
newConfiguration.getId(),
new PipelineHolder(newConfiguration, newPipeline)
); );
if (previous == null) {
continue;
}
Pipeline oldPipeline = previous.pipeline;
newPipeline.getMetrics().add(oldPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
} catch (ElasticsearchParseException e) { } catch (ElasticsearchParseException e) {
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); Pipeline pipeline = substitutePipeline(newConfiguration.getId(), e);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, pipeline));
if (exceptions == null) {
exceptions = new ArrayList<>();
}
exceptions.add(e); exceptions.add(e);
} catch (Exception e) { } catch (Exception e) {
ElasticsearchParseException parseException = new ElasticsearchParseException( ElasticsearchParseException parseException = new ElasticsearchParseException(
"Error updating pipeline with id [" + pipeline.getId() + "]", e); "Error updating pipeline with id [" + newConfiguration.getId() + "]", e);
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); Pipeline pipeline = substitutePipeline(newConfiguration.getId(), parseException);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, pipeline));
if (exceptions == null) {
exceptions = new ArrayList<>();
}
exceptions.add(parseException); exceptions.add(parseException);
} }
} }
this.pipelines = Collections.unmodifiableMap(pipelines);
ExceptionsHelper.rethrowAndSuppress(exceptions); // Iterate over the current active pipelines and check whether they are missing in the pipeline configuration and
// if so delete the pipeline from new Pipelines map:
for (Map.Entry<String, PipelineHolder> entry : existingPipelines.entrySet()) {
if (newIngestMetadata.getPipelines().get(entry.getKey()) == null) {
if (newPipelines == null) {
newPipelines = new HashMap<>(existingPipelines);
}
newPipelines.remove(entry.getKey());
}
}
if (newPipelines != null) {
// Update the pipelines:
this.pipelines = Collections.unmodifiableMap(newPipelines);
// Rethrow errors that may have occurred during creating new pipeline instances:
if (exceptions != null) {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
}
}
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
Processor failureProcessor = new AbstractProcessor(tag) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
throw new IllegalStateException(errorMessage);
}
@Override
public String getType() {
return type;
}
};
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
}
static class PipelineHolder {
final PipelineConfiguration configuration;
final Pipeline pipeline;
PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) {
this.configuration = Objects.requireNonNull(configuration);
this.pipeline = Objects.requireNonNull(pipeline);
}
} }
} }

View File

@ -153,10 +153,91 @@ public class IngestServiceTests extends ESTestCase {
.build(); .build();
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.pipelines().size(), is(1)); assertThat(ingestService.pipelines().size(), is(1));
assertThat(ingestService.pipelines().get("_id").getId(), equalTo("_id")); assertThat(ingestService.pipelines().get("_id").pipeline.getId(), equalTo("_id"));
assertThat(ingestService.pipelines().get("_id").getDescription(), nullValue()); assertThat(ingestService.pipelines().get("_id").pipeline.getDescription(), nullValue());
assertThat(ingestService.pipelines().get("_id").getProcessors().size(), equalTo(1)); assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().size(), equalTo(1));
assertThat(ingestService.pipelines().get("_id").getProcessors().get(0).getType(), equalTo("set")); assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().get(0).getType(), equalTo("set"));
}
public void testInnerUpdatePipelines() {
IngestService ingestService = createWithProcessors();
assertThat(ingestService.pipelines().size(), is(0));
PipelineConfiguration pipeline1 = new PipelineConfiguration("_id1", new BytesArray("{\"processors\": []}"), XContentType.JSON);
IngestMetadata ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1));
ingestService.innerUpdatePipelines(ingestMetadata);
assertThat(ingestService.pipelines().size(), is(1));
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
PipelineConfiguration pipeline2 = new PipelineConfiguration("_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON);
ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id2", pipeline2));
ingestService.innerUpdatePipelines(ingestMetadata);
assertThat(ingestService.pipelines().size(), is(2));
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2"));
assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0));
PipelineConfiguration pipeline3 = new PipelineConfiguration("_id3", new BytesArray("{\"processors\": []}"), XContentType.JSON);
ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id2", pipeline2, "_id3", pipeline3));
ingestService.innerUpdatePipelines(ingestMetadata);
assertThat(ingestService.pipelines().size(), is(3));
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2"));
assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0));
assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3"));
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0));
ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id3", pipeline3));
ingestService.innerUpdatePipelines(ingestMetadata);
assertThat(ingestService.pipelines().size(), is(2));
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3"));
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0));
pipeline3 = new PipelineConfiguration(
"_id3",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON
);
ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id3", pipeline3));
ingestService.innerUpdatePipelines(ingestMetadata);
assertThat(ingestService.pipelines().size(), is(2));
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3"));
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(1));
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().get(0).getType(), equalTo("set"));
// Perform an update with no changes:
Map<String, IngestService.PipelineHolder> pipelines = ingestService.pipelines();
ingestService.innerUpdatePipelines(ingestMetadata);
assertThat(ingestService.pipelines(), sameInstance(pipelines));
}
private static <K, V> Map<K, V> mapOf(K key, V value) {
return Collections.singletonMap(key, value);
}
private static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);
return map;
}
private static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2, K key3, V value3) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
return map;
} }
public void testDelete() { public void testDelete() {