From 484f5cee39a3d57c248684192550aca38a4bbfb7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 May 2019 08:24:33 +0200 Subject: [PATCH] Stricter update dependency between pipelines and components used by pipelines (#42038) Add support for components used by processor factories to get updated before processor factories create new processor instances. Components can register via `IngestService#addIngestClusterStateListener(...)` then if the internal representation of ingest pipelines get updated, these components get updated with the current cluster state before pipelines are updated. Registered EnrichProcessorFactory as ingest cluster state listener, so that it has always an up to date view of the active enrich policies. --- .../elasticsearch/ingest/IngestService.java | 19 ++++++ .../xpack/enrich/EnrichPlugin.java | 13 +--- .../xpack/enrich/EnrichProcessorFactory.java | 27 +++++--- .../xpack/enrich/EnrichPolicyUpdateTests.java | 61 +++++++++++++++++++ .../enrich/EnrichProcessorFactoryTests.java | 36 +++-------- 5 files changed, 112 insertions(+), 44 deletions(-) create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 84c76284121..8f22e7573dc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -67,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -90,6 +91,7 @@ public class IngestService implements ClusterStateApplier { private volatile Map pipelines = Collections.emptyMap(); private final ThreadPool threadPool; private final IngestMetric totalMetrics = new IngestMetric(); + private final List> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, @@ -418,6 +420,17 @@ public class IngestService implements ClusterStateApplier { return statsBuilder.build(); } + /** + * Adds a listener that gets invoked with the current cluster state before processor factories + * get invoked. + * + * This is useful for components that are used by ingest processors, so that have the opportunity to update + * before these components get used by the ingest processor factory. + */ + public void addIngestClusterStateListener(Consumer listener) { + ingestClusterStateListeners.add(listener); + } + //package private for testing static String getProcessorName(Processor processor){ // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name @@ -490,6 +503,12 @@ public class IngestService implements ClusterStateApplier { return; } + // Publish cluster state to components that are used by processor factories before letting + // processor factories create new processor instances. + // (Note that this needs to be done also in the case when there is no change to ingest metadata, because in the case + // when only the part of the cluster state that a component is interested in, is updated.) + ingestClusterStateListeners.forEach(consumer -> consumer.accept(state)); + IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); if (newIngestMetadata == null) { return; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 5057c1587fe..d2713017073 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -50,7 +49,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -71,15 +69,10 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { @Override public Map getProcessors(Processor.Parameters parameters) { - final ClusterService clusterService = parameters.ingestService.getClusterService(); - // Pipelines are created from cluster state update thead and calling ClusterService#state() from that thead is illegal - // (because the current cluster state update is in progress) - // So with the below atomic reference we keep track of the latest updated cluster state: - AtomicReference reference = new AtomicReference<>(); - clusterService.addStateApplier(event -> reference.set(event.state())); - + EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.localShardSearcher); + parameters.ingestService.addIngestClusterStateListener(factory); return Collections.singletonMap(EnrichProcessorFactory.TYPE, - new EnrichProcessorFactory(reference::get, parameters.localShardSearcher)); + factory); } public List> getActions() { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index a1e1f1e1870..00e269ae995 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -13,29 +13,27 @@ import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; -final class EnrichProcessorFactory implements Processor.Factory { +final class EnrichProcessorFactory implements Processor.Factory, Consumer { static final String TYPE = "enrich"; - - private final Function policyLookup; private final Function> searchProvider; + volatile Map policies = Collections.emptyMap(); - EnrichProcessorFactory(Supplier clusterStateSupplier, - Function> searchProvider) { - this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get()); + EnrichProcessorFactory(Function> searchProvider) { this.searchProvider = searchProvider; } @Override public Processor create(Map processorFactories, String tag, Map config) throws Exception { String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name"); - EnrichPolicy policy = policyLookup.apply(policyName); + EnrichPolicy policy = policies.get(policyName); if (policy == null) { throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); } @@ -65,6 +63,19 @@ final class EnrichProcessorFactory implements Processor.Factory { } } + @Override + public void accept(ClusterState state) { + final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE); + if (enrichMetadata == null) { + return; + } + if (policies.equals(enrichMetadata.getPolicies())) { + return; + } + + policies = enrichMetadata.getPolicies(); + } + static final class EnrichSpecification { final String sourceField; diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java new file mode 100644 index 00000000000..1fd1204c312 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; + +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.sameInstance; + +public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singleton(EnrichPlugin.class); + } + + public void testUpdatePolicyOnly() { + IngestService ingestService = getInstanceFromNode(IngestService.class); + EnrichProcessorFactory enrichProcessorFactory = + (EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE); + + EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"), + "key1", Collections.singletonList("field1")); + PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1)); + + String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"enrich_values\": []}}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON); + assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet()); + Pipeline pipelineInstance1 = ingestService.getPipeline("1"); + assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(ExactMatchProcessor.class)); + + EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"), + "key2", Collections.singletonList("field2")); + putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance2); + assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); + assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance2)); + + Pipeline pipelineInstance2 = ingestService.getPipeline("1"); + assertThat(pipelineInstance2, sameInstance(pipelineInstance1)); + } + + +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java index 803a7a8526c..c92dee4710a 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java @@ -6,9 +6,6 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -20,7 +17,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -32,7 +28,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase { List enrichValues = Arrays.asList("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key", enrichValues); - EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null); + EnrichProcessorFactory factory = new EnrichProcessorFactory(null); + factory.policies = Collections.singletonMap("majestic", policy); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -73,7 +70,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase { public void testPolicyDoesNotExist() { List enrichValues = Arrays.asList("globalRank", "tldRank", "tld"); - EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier(), null); + EnrichProcessorFactory factory = new EnrichProcessorFactory(null); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -106,7 +103,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase { List enrichValues = Arrays.asList("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key", enrichValues); - EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("_name", policy), null); + EnrichProcessorFactory factory = new EnrichProcessorFactory(null); + factory.policies = Collections.singletonMap("_name", policy); Map config = new HashMap<>(); config.put("enrich_key", "host"); @@ -138,7 +136,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase { List enrichValues = Arrays.asList("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues); - EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null); + EnrichProcessorFactory factory = new EnrichProcessorFactory(null); + factory.policies = Collections.singletonMap("majestic", policy); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -167,11 +166,12 @@ public class EnrichProcessorFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]")); } - public void testNonExistingDecorateField() throws Exception { + public void testNonExistingDecorateField() { List enrichValues = Arrays.asList("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key", enrichValues); - EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null); + EnrichProcessorFactory factory = new EnrichProcessorFactory(null); + factory.policies = Collections.singletonMap("majestic", policy); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -187,20 +187,4 @@ public class EnrichProcessorFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]")); } - private static Supplier createClusterStateSupplier(String policyName, EnrichPolicy policy) { - EnrichMetadata enrichMetadata = new EnrichMetadata(Collections.singletonMap(policyName, policy)); - ClusterState state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build()) - .build(); - return () -> state; - } - - private static Supplier createClusterStateSupplier() { - EnrichMetadata enrichMetadata = new EnrichMetadata(Collections.emptyMap()); - ClusterState state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build()) - .build(); - return () -> state; - } - }