From 48a71459c0a6bca5885cfea32179a315e4b66758 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 May 2019 09:21:30 +0200 Subject: [PATCH] Improve how internal representation of pipelines are updated (#42257) If a single pipeline is updated then the internal representation of all pipelines was updated. With this change, only the internal representation of the pipelines that have been modified will be updated. Prior to this change the IngestMetadata of the previous and current cluster was used to determine whether the internal representation of pipelines should be updated. If applying the previous cluster state change failed then subsequent cluster state changes that have no changes to IngestMetadata will not attempt to update the internal representation of the pipelines. This commit, changes how the IngestService updates the internal representation by keeping track of the underlying configuration and use that to detect against the new IngestMetadata whether a pipeline configuration has been changed and if so, then the internal pipeline representation will be updated. --- .../elasticsearch/ingest/IngestService.java | 221 +++++++++++------- .../ingest/IngestServiceTests.java | 89 ++++++- 2 files changed, 223 insertions(+), 87 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 31023fc85d5..4229b147f70 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -80,7 +80,7 @@ public class IngestService implements ClusterStateApplier { // We know of all the processor factories when a node with all its plugin have been initialized. Also some // processor factories rely on other node services. Custom metadata is statically registered when classes // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. - private volatile Map pipelines = new HashMap<>(); + private volatile Map pipelines = Collections.emptyMap(); private final ThreadPool threadPool; private final IngestMetric totalMetrics = new IngestMetric(); @@ -236,7 +236,12 @@ public class IngestService implements ClusterStateApplier { * Returns the pipeline by the specified id */ public Pipeline getPipeline(String id) { - return pipelines.get(id); + PipelineHolder holder = pipelines.get(id); + if (holder != null) { + return holder.pipeline; + } else { + return null; + } } public Map getProcessorFactories() { @@ -252,52 +257,10 @@ public class IngestService implements ClusterStateApplier { return new IngestInfo(processorInfoList); } - Map pipelines() { + Map pipelines() { return pipelines; } - @Override - public void applyClusterState(final ClusterChangedEvent event) { - ClusterState state = event.state(); - Map originalPipelines = pipelines; - try { - innerUpdatePipelines(event.previousState(), state); - } catch (ElasticsearchParseException e) { - logger.warn("failed to update ingest pipelines", e); - } - //pipelines changed, so add the old metrics to the new metrics - if (originalPipelines != pipelines) { - pipelines.forEach((id, pipeline) -> { - Pipeline originalPipeline = originalPipelines.get(id); - if (originalPipeline != null) { - pipeline.getMetrics().add(originalPipeline.getMetrics()); - List> oldPerProcessMetrics = new ArrayList<>(); - List> newPerProcessMetrics = new ArrayList<>(); - getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics); - getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics); - //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since - //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and - //consistent id's per processor and/or semantic equals for each processor will be needed. - if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { - Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); - for (Tuple compositeMetric : newPerProcessMetrics) { - String type = compositeMetric.v1().getType(); - IngestMetric metric = compositeMetric.v2(); - if (oldMetricsIterator.hasNext()) { - Tuple oldCompositeMetric = oldMetricsIterator.next(); - String oldType = oldCompositeMetric.v1().getType(); - IngestMetric oldMetric = oldCompositeMetric.v2(); - if (type.equals(oldType)) { - metric.add(oldMetric); - } - } - } - } - } - }); - } - } - /** * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. @@ -324,26 +287,7 @@ public class IngestService implements ClusterStateApplier { return processorMetrics; } - private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { - String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; - String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; - String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; - Processor failureProcessor = new AbstractProcessor(tag) { - @Override - public IngestDocument execute(IngestDocument ingestDocument) { - throw new IllegalStateException(errorMessage); - } - - @Override - public String getType() { - return type; - } - }; - String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; - return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); - } - - public static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { + static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); Map pipelines; if (currentIngestMetadata != null) { @@ -403,10 +347,11 @@ public class IngestService implements ClusterStateApplier { String pipelineId = indexRequest.getPipeline(); if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { try { - Pipeline pipeline = pipelines.get(pipelineId); - if (pipeline == null) { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } + Pipeline pipeline = holder.pipeline; innerExecute(indexRequest, pipeline, itemDroppedHandler); //this shouldn't be needed here but we do it for consistency with index api // which requires it to prevent double execution @@ -424,7 +369,8 @@ public class IngestService implements ClusterStateApplier { public IngestStats stats() { IngestStats.Builder statsBuilder = new IngestStats.Builder(); statsBuilder.addTotalMetrics(totalMetrics); - pipelines.forEach((id, pipeline) -> { + pipelines.forEach((id, holder) -> { + Pipeline pipeline = holder.pipeline; CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); List> processorMetrics = new ArrayList<>(); @@ -503,37 +449,146 @@ public class IngestService implements ClusterStateApplier { } } - private void innerUpdatePipelines(ClusterState previousState, ClusterState state) { + @Override + public void applyClusterState(final ClusterChangedEvent event) { + ClusterState state = event.state(); if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { return; } - IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); - IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); - if (Objects.equals(ingestMetadata, previousIngestMetadata)) { + IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + if (newIngestMetadata == null) { return; } - Map pipelines = new HashMap<>(); - List exceptions = new ArrayList<>(); - for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { + try { + innerUpdatePipelines(newIngestMetadata); + } catch (ElasticsearchParseException e) { + logger.warn("failed to update ingest pipelines", e); + } + } + + void innerUpdatePipelines(IngestMetadata newIngestMetadata) { + Map existingPipelines = this.pipelines; + + // Lazy initialize these variables in order to favour the most like scenario that there are no pipeline changes: + Map newPipelines = null; + List exceptions = null; + // Iterate over pipeline configurations in ingest metadata and constructs a new pipeline if there is no pipeline + // or the pipeline configuration has been modified + for (PipelineConfiguration newConfiguration : newIngestMetadata.getPipelines().values()) { + PipelineHolder previous = existingPipelines.get(newConfiguration.getId()); + if (previous != null && previous.configuration.equals(newConfiguration)) { + continue; + } + + if (newPipelines == null) { + newPipelines = new HashMap<>(existingPipelines); + } try { - pipelines.put( - pipeline.getId(), - Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories, scriptService) + Pipeline newPipeline = + Pipeline.create(newConfiguration.getId(), newConfiguration.getConfigAsMap(), processorFactories, scriptService); + newPipelines.put( + newConfiguration.getId(), + new PipelineHolder(newConfiguration, newPipeline) ); + + if (previous == null) { + continue; + } + Pipeline oldPipeline = previous.pipeline; + newPipeline.getMetrics().add(oldPipeline.getMetrics()); + List> oldPerProcessMetrics = new ArrayList<>(); + List> newPerProcessMetrics = new ArrayList<>(); + getProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics); + getProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics); + //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since + //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and + //consistent id's per processor and/or semantic equals for each processor will be needed. + if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { + Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); + for (Tuple compositeMetric : newPerProcessMetrics) { + String type = compositeMetric.v1().getType(); + IngestMetric metric = compositeMetric.v2(); + if (oldMetricsIterator.hasNext()) { + Tuple oldCompositeMetric = oldMetricsIterator.next(); + String oldType = oldCompositeMetric.v1().getType(); + IngestMetric oldMetric = oldCompositeMetric.v2(); + if (type.equals(oldType)) { + metric.add(oldMetric); + } + } + } + } } catch (ElasticsearchParseException e) { - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); + Pipeline pipeline = substitutePipeline(newConfiguration.getId(), e); + newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, pipeline)); + if (exceptions == null) { + exceptions = new ArrayList<>(); + } exceptions.add(e); } catch (Exception e) { ElasticsearchParseException parseException = new ElasticsearchParseException( - "Error updating pipeline with id [" + pipeline.getId() + "]", e); - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + "Error updating pipeline with id [" + newConfiguration.getId() + "]", e); + Pipeline pipeline = substitutePipeline(newConfiguration.getId(), parseException); + newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, pipeline)); + if (exceptions == null) { + exceptions = new ArrayList<>(); + } exceptions.add(parseException); } } - this.pipelines = Collections.unmodifiableMap(pipelines); - ExceptionsHelper.rethrowAndSuppress(exceptions); + + // Iterate over the current active pipelines and check whether they are missing in the pipeline configuration and + // if so delete the pipeline from new Pipelines map: + for (Map.Entry entry : existingPipelines.entrySet()) { + if (newIngestMetadata.getPipelines().get(entry.getKey()) == null) { + if (newPipelines == null) { + newPipelines = new HashMap<>(existingPipelines); + } + newPipelines.remove(entry.getKey()); + } + } + + if (newPipelines != null) { + // Update the pipelines: + this.pipelines = Collections.unmodifiableMap(newPipelines); + + // Rethrow errors that may have occurred during creating new pipeline instances: + if (exceptions != null) { + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + } + } + + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public IngestDocument execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); + } + + @Override + public String getType() { + return type; + } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); + } + + static class PipelineHolder { + + final PipelineConfiguration configuration; + final Pipeline pipeline; + + PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { + this.configuration = Objects.requireNonNull(configuration); + this.pipeline = Objects.requireNonNull(pipeline); + } } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index e5aea1f5d5c..af4389140f4 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -153,10 +153,91 @@ public class IngestServiceTests extends ESTestCase { .build(); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); assertThat(ingestService.pipelines().size(), is(1)); - assertThat(ingestService.pipelines().get("_id").getId(), equalTo("_id")); - assertThat(ingestService.pipelines().get("_id").getDescription(), nullValue()); - assertThat(ingestService.pipelines().get("_id").getProcessors().size(), equalTo(1)); - assertThat(ingestService.pipelines().get("_id").getProcessors().get(0).getType(), equalTo("set")); + assertThat(ingestService.pipelines().get("_id").pipeline.getId(), equalTo("_id")); + assertThat(ingestService.pipelines().get("_id").pipeline.getDescription(), nullValue()); + assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().size(), equalTo(1)); + assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().get(0).getType(), equalTo("set")); + } + + public void testInnerUpdatePipelines() { + IngestService ingestService = createWithProcessors(); + assertThat(ingestService.pipelines().size(), is(0)); + + PipelineConfiguration pipeline1 = new PipelineConfiguration("_id1", new BytesArray("{\"processors\": []}"), XContentType.JSON); + IngestMetadata ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1)); + + ingestService.innerUpdatePipelines(ingestMetadata); + assertThat(ingestService.pipelines().size(), is(1)); + assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); + assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); + + PipelineConfiguration pipeline2 = new PipelineConfiguration("_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON); + ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id2", pipeline2)); + + ingestService.innerUpdatePipelines(ingestMetadata); + assertThat(ingestService.pipelines().size(), is(2)); + assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); + assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); + assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2")); + assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0)); + + PipelineConfiguration pipeline3 = new PipelineConfiguration("_id3", new BytesArray("{\"processors\": []}"), XContentType.JSON); + ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id2", pipeline2, "_id3", pipeline3)); + + ingestService.innerUpdatePipelines(ingestMetadata); + assertThat(ingestService.pipelines().size(), is(3)); + assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); + assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); + assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2")); + assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0)); + assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); + assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0)); + + ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id3", pipeline3)); + + ingestService.innerUpdatePipelines(ingestMetadata); + assertThat(ingestService.pipelines().size(), is(2)); + assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); + assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); + assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); + assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0)); + + pipeline3 = new PipelineConfiguration( + "_id3",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON + ); + ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id3", pipeline3)); + + ingestService.innerUpdatePipelines(ingestMetadata); + assertThat(ingestService.pipelines().size(), is(2)); + assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); + assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); + assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); + assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(1)); + assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().get(0).getType(), equalTo("set")); + + // Perform an update with no changes: + Map pipelines = ingestService.pipelines(); + ingestService.innerUpdatePipelines(ingestMetadata); + assertThat(ingestService.pipelines(), sameInstance(pipelines)); + } + + private static Map mapOf(K key, V value) { + return Collections.singletonMap(key, value); + } + + private static Map mapOf(K key1, V value1, K key2, V value2) { + Map map = new HashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + return map; + } + + private static Map mapOf(K key1, V value1, K key2, V value2, K key3, V value3) { + Map map = new HashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + map.put(key3, value3); + return map; } public void testDelete() {