mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Decouple enrich processor factory from enrich policy (#45826)
This commit changes the enrich processor factory to read the required configuration from the current enrich index (from meta mapping field) in order to create the processor. Before this change the required config was read from the enrich policy in the cluster state. Enrich policies are going to be stored in an index (instead of the cluster state). In a processor factory there isn't a way to load something from an index, so with this change we read the required config / info from the enrich index (which is derived from the enrich policy), which then allows us to move enrich policies to an index. With this change it is required to execute a policy before creating a pipeline. Otherwise there is no enrich index and then there is no way to validate that a policy exist or retrieve its type and match field. Relates to #32789
This commit is contained in:
parent
cb42e19a32
commit
6067065ed6
@ -5,11 +5,6 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
@ -34,11 +29,16 @@ import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(EnrichPolicyMaintenanceService.class);
|
||||
|
||||
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME;
|
||||
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME;
|
||||
private static final IndicesOptions IGNORE_UNAVAILABLE = IndicesOptions.fromOptions(true, false, false, false);
|
||||
|
||||
private final Settings settings;
|
||||
|
@ -5,14 +5,6 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
@ -51,13 +43,21 @@ import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
|
||||
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class EnrichPolicyRunner implements Runnable {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
|
||||
|
||||
static final String ENRICH_POLICY_FIELD_NAME = "enrich_policy";
|
||||
static final String ENRICH_POLICY_NAME_FIELD_NAME = "enrich_policy_name";
|
||||
static final String ENRICH_POLICY_TYPE_FIELD_NAME = "enrich_policy_type";
|
||||
static final String ENRICH_MATCH_FIELD_NAME = "enrich_match_field";
|
||||
|
||||
private final String policyName;
|
||||
private final EnrichPolicy policy;
|
||||
@ -216,8 +216,9 @@ public class EnrichPolicyRunner implements Runnable {
|
||||
.endObject()
|
||||
.endObject()
|
||||
.startObject("_meta")
|
||||
.field(ENRICH_POLICY_FIELD_NAME, policyName)
|
||||
.field(ENRICH_KEY_FIELD_NAME, policy.getMatchField())
|
||||
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
|
||||
.field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField())
|
||||
.field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType())
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
|
@ -7,6 +7,10 @@ package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
@ -19,7 +23,8 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
|
||||
|
||||
static final String TYPE = "enrich";
|
||||
private final Client client;
|
||||
volatile Map<String, EnrichPolicy> policies = Collections.emptyMap();
|
||||
|
||||
volatile MetaData metaData;
|
||||
|
||||
EnrichProcessorFactory(Client client) {
|
||||
this.client = client;
|
||||
@ -28,36 +33,37 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
|
||||
@Override
|
||||
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
|
||||
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
|
||||
EnrichPolicy policy = policies.get(policyName);
|
||||
if (policy == null) {
|
||||
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
|
||||
String policyAlias = EnrichPolicy.getBaseName(policyName);
|
||||
AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(policyAlias);
|
||||
if (aliasOrIndex == null) {
|
||||
throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]");
|
||||
}
|
||||
assert aliasOrIndex.isAlias();
|
||||
assert aliasOrIndex.getIndices().size() == 1;
|
||||
IndexMetaData imd = aliasOrIndex.getIndices().get(0);
|
||||
|
||||
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
|
||||
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
|
||||
String policyType =
|
||||
(String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME, mappingAsMap);
|
||||
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);
|
||||
|
||||
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
|
||||
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
|
||||
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
|
||||
|
||||
switch (policy.getType()) {
|
||||
switch (policyType) {
|
||||
case EnrichPolicy.EXACT_MATCH_TYPE:
|
||||
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
|
||||
return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
|
||||
ignoreMissing, overrideEnabled);
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
|
||||
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(ClusterState state) {
|
||||
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
|
||||
if (enrichMetadata == null) {
|
||||
return;
|
||||
}
|
||||
if (policies.equals(enrichMetadata.getPolicies())) {
|
||||
return;
|
||||
}
|
||||
|
||||
policies = enrichMetadata.getPolicies();
|
||||
metaData = state.getMetaData();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ public class EnrichPolicyMaintenanceServiceTests extends ESSingleNodeTestCase {
|
||||
.startObject()
|
||||
.startObject(MapperService.SINGLE_MAPPING_NAME)
|
||||
.startObject("_meta")
|
||||
.field(EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME, forPolicy)
|
||||
.field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy)
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
|
@ -8,38 +8,39 @@ package org.elasticsearch.xpack.enrich;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Collections.singleton(LocalStateEnrich.class);
|
||||
return Collections.singleton(LocalStateEnrich.class, ReindexPlugin.class);
|
||||
}
|
||||
|
||||
public void testUpdatePolicyOnly() {
|
||||
IngestService ingestService = getInstanceFromNode(IngestService.class);
|
||||
EnrichProcessorFactory enrichProcessorFactory =
|
||||
(EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE);
|
||||
createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");
|
||||
|
||||
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"),
|
||||
"key1", Collections.singletonList("field1"));
|
||||
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
|
||||
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
|
||||
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
|
||||
assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet());
|
||||
|
||||
String pipelineConfig =
|
||||
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
|
||||
|
@ -6,10 +6,16 @@
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@ -28,7 +34,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
|
||||
enrichValues);
|
||||
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
|
||||
factory.policies = Collections.singletonMap("majestic", policy);
|
||||
factory.metaData = createMetaData("majestic", policy);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("policy_name", "majestic");
|
||||
@ -67,6 +73,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
public void testPolicyDoesNotExist() {
|
||||
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
|
||||
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
|
||||
factory.metaData = MetaData.builder().build();
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("policy_name", "majestic");
|
||||
@ -92,15 +99,12 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
config.put("set_from", valuesConfig);
|
||||
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
|
||||
assertThat(e.getMessage(), equalTo("policy [majestic] does not exists"));
|
||||
assertThat(e.getMessage(), equalTo("no enrich index exists for policy with name [majestic]"));
|
||||
}
|
||||
|
||||
public void testPolicyNameMissing() {
|
||||
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
|
||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
|
||||
enrichValues);
|
||||
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
|
||||
factory.policies = Collections.singletonMap("_name", policy);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("enrich_key", "host");
|
||||
@ -128,12 +132,12 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
assertThat(e.getMessage(), equalTo("[policy_name] required property is missing"));
|
||||
}
|
||||
|
||||
public void testUnsupportedPolicy() {
|
||||
public void testUnsupportedPolicy() throws Exception {
|
||||
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
|
||||
EnrichPolicy policy =
|
||||
new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues);
|
||||
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
|
||||
factory.policies = Collections.singletonMap("majestic", policy);
|
||||
factory.metaData = createMetaData("majestic", policy);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("policy_name", "majestic");
|
||||
@ -153,7 +157,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
|
||||
Collections.singletonList("source_index"), "host", enrichValues);
|
||||
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
|
||||
factory.policies = Collections.singletonMap("majestic", policy);
|
||||
factory.metaData = createMetaData("majestic", policy);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("policy_name", "majestic");
|
||||
@ -172,7 +176,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
|
||||
Collections.singletonList("source_index"), "host", enrichValues);
|
||||
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
|
||||
factory.policies = Collections.singletonMap("majestic", policy);
|
||||
factory.metaData = createMetaData("majestic", policy);
|
||||
|
||||
Map<String, Object> config1 = new HashMap<>();
|
||||
config1.put("policy_name", "majestic");
|
||||
@ -182,4 +186,18 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
||||
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
|
||||
}
|
||||
|
||||
static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData.Builder builder = IndexMetaData.builder(EnrichPolicy.getBaseName(name) + "-1");
|
||||
builder.settings(settings);
|
||||
builder.putMapping("_doc", "{\"_meta\": {\"enrich_match_field\": \"" + policy.getMatchField() +
|
||||
"\", \"enrich_policy_type\": \"" + policy.getType() + "\"}}");
|
||||
builder.putAlias(AliasMetaData.builder(EnrichPolicy.getBaseName(name)).build());
|
||||
return MetaData.builder().put(builder).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user