Stricter update dependency between pipelines and components used by pipelines (#42038)

Add support for components used by processor factories to get updated
before processor factories create new processor instances.

Components can register via `IngestService#addIngestClusterStateListener(...)`
then if the internal  representation of ingest pipelines get updated,
these components get  updated with the current cluster state before
pipelines are updated.

Registered EnrichProcessorFactory as ingest cluster state listener, so
that it has always an up to date view of the active enrich policies.
This commit is contained in:
Martijn van Groningen 2019-05-28 08:24:33 +02:00
parent a91cec4c46
commit 484f5cee39
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 112 additions and 44 deletions

View File

@ -67,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -90,6 +91,7 @@ public class IngestService implements ClusterStateApplier {
private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();
private final ThreadPool threadPool;
private final IngestMetric totalMetrics = new IngestMetric();
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
@ -418,6 +420,17 @@ public class IngestService implements ClusterStateApplier {
return statsBuilder.build();
}
/**
* Adds a listener that gets invoked with the current cluster state before processor factories
* get invoked.
*
* This is useful for components that are used by ingest processors, so that have the opportunity to update
* before these components get used by the ingest processor factory.
*/
public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
ingestClusterStateListeners.add(listener);
}
//package private for testing
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
@ -490,6 +503,12 @@ public class IngestService implements ClusterStateApplier {
return;
}
// Publish cluster state to components that are used by processor factories before letting
// processor factories create new processor instances.
// (Note that this needs to be done also in the case when there is no change to ingest metadata, because in the case
// when only the part of the cluster state that a component is interested in, is updated.)
ingestClusterStateListeners.forEach(consumer -> consumer.accept(state));
IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
if (newIngestMetadata == null) {
return;

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -50,7 +49,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
@ -71,15 +69,10 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
final ClusterService clusterService = parameters.ingestService.getClusterService();
// Pipelines are created from cluster state update thead and calling ClusterService#state() from that thead is illegal
// (because the current cluster state update is in progress)
// So with the below atomic reference we keep track of the latest updated cluster state:
AtomicReference<ClusterState> reference = new AtomicReference<>();
clusterService.addStateApplier(event -> reference.set(event.state()));
EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.localShardSearcher);
parameters.ingestService.addIngestClusterStateListener(factory);
return Collections.singletonMap(EnrichProcessorFactory.TYPE,
new EnrichProcessorFactory(reference::get, parameters.localShardSearcher));
factory);
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

View File

@ -13,29 +13,27 @@ import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
final class EnrichProcessorFactory implements Processor.Factory {
final class EnrichProcessorFactory implements Processor.Factory, Consumer<ClusterState> {
static final String TYPE = "enrich";
private final Function<String, EnrichPolicy> policyLookup;
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;
volatile Map<String, EnrichPolicy> policies = Collections.emptyMap();
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider) {
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
EnrichProcessorFactory(Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider) {
this.searchProvider = searchProvider;
}
@Override
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
EnrichPolicy policy = policyLookup.apply(policyName);
EnrichPolicy policy = policies.get(policyName);
if (policy == null) {
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
}
@ -65,6 +63,19 @@ final class EnrichProcessorFactory implements Processor.Factory {
}
}
@Override
public void accept(ClusterState state) {
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata == null) {
return;
}
if (policies.equals(enrichMetadata.getPolicies())) {
return;
}
policies = enrichMetadata.getPolicies();
}
static final class EnrichSpecification {
final String sourceField;

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.sameInstance;
public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(EnrichPlugin.class);
}
public void testUpdatePolicyOnly() {
IngestService ingestService = getInstanceFromNode(IngestService.class);
EnrichProcessorFactory enrichProcessorFactory =
(EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE);
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"),
"key1", Collections.singletonList("field1"));
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"enrich_values\": []}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON);
assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet());
Pipeline pipelineInstance1 = ingestService.getPipeline("1");
assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(ExactMatchProcessor.class));
EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"),
"key2", Collections.singletonList("field2"));
putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance2);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance2));
Pipeline pipelineInstance2 = ingestService.getPipeline("1");
assertThat(pipelineInstance2, sameInstance(pipelineInstance1));
}
}

View File

@ -6,9 +6,6 @@
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@ -20,7 +17,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -32,7 +28,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("majestic", policy);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
@ -73,7 +70,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testPolicyDoesNotExist() {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier(), null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
@ -106,7 +103,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("_name", policy), null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("_name", policy);
Map<String, Object> config = new HashMap<>();
config.put("enrich_key", "host");
@ -138,7 +136,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy =
new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("majestic", policy);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
@ -167,11 +166,12 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]"));
}
public void testNonExistingDecorateField() throws Exception {
public void testNonExistingDecorateField() {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("majestic", policy);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
@ -187,20 +187,4 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]"));
}
private static Supplier<ClusterState> createClusterStateSupplier(String policyName, EnrichPolicy policy) {
EnrichMetadata enrichMetadata = new EnrichMetadata(Collections.singletonMap(policyName, policy));
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build())
.build();
return () -> state;
}
private static Supplier<ClusterState> createClusterStateSupplier() {
EnrichMetadata enrichMetadata = new EnrichMetadata(Collections.emptyMap());
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build())
.build();
return () -> state;
}
}