Add support for overwrite parameter in the enrich processor. (#45029)
Similar to how it is supported in the set processor: https://www.elastic.co/guide/en/elasticsearch/reference/current/set-processor.html Relates to #32789
This commit is contained in:
parent
39f280364b
commit
e3fd1e6c7d
|
@ -37,6 +37,7 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
|
|||
|
||||
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
|
||||
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
|
||||
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
|
||||
|
||||
final List<EnrichSpecification> specifications;
|
||||
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
|
||||
|
@ -54,7 +55,7 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
|
|||
|
||||
switch (policy.getType()) {
|
||||
case EnrichPolicy.EXACT_MATCH_TYPE:
|
||||
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, specifications);
|
||||
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
private final String policyName;
|
||||
private final String enrichKey;
|
||||
private final boolean ignoreMissing;
|
||||
private final boolean overrideEnabled;
|
||||
private final List<EnrichSpecification> specifications;
|
||||
|
||||
ExactMatchProcessor(String tag,
|
||||
|
@ -39,6 +40,7 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
String policyName,
|
||||
String enrichKey,
|
||||
boolean ignoreMissing,
|
||||
boolean overrideEnabled,
|
||||
List<EnrichSpecification> specifications) {
|
||||
this(
|
||||
tag,
|
||||
|
@ -46,6 +48,7 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
policyName,
|
||||
enrichKey,
|
||||
ignoreMissing,
|
||||
overrideEnabled,
|
||||
specifications
|
||||
);
|
||||
}
|
||||
|
@ -55,12 +58,14 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
String policyName,
|
||||
String enrichKey,
|
||||
boolean ignoreMissing,
|
||||
boolean overrideEnabled,
|
||||
List<EnrichSpecification> specifications) {
|
||||
super(tag);
|
||||
this.searchRunner = searchRunner;
|
||||
this.policyName = policyName;
|
||||
this.enrichKey = enrichKey;
|
||||
this.ignoreMissing = ignoreMissing;
|
||||
this.overrideEnabled = overrideEnabled;
|
||||
this.specifications = specifications;
|
||||
}
|
||||
|
||||
|
@ -111,7 +116,9 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
|
||||
for (EnrichSpecification specification : specifications) {
|
||||
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
|
||||
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
|
||||
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
|
||||
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
|
||||
}
|
||||
}
|
||||
handler.accept(ingestDocument, null);
|
||||
});
|
||||
|
@ -142,6 +149,10 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
return ignoreMissing;
|
||||
}
|
||||
|
||||
boolean isOverrideEnabled() {
|
||||
return overrideEnabled;
|
||||
}
|
||||
|
||||
List<EnrichSpecification> getSpecifications() {
|
||||
return specifications;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,11 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
|||
config.put("ignore_missing", keyIgnoreMissing);
|
||||
}
|
||||
|
||||
Boolean overrideEnabled = randomBoolean() ? null : randomBoolean();
|
||||
if (overrideEnabled != null) {
|
||||
config.put("override", overrideEnabled);
|
||||
}
|
||||
|
||||
int numRandomValues = randomIntBetween(1, 8);
|
||||
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
|
||||
for (int i = 0; i < numRandomValues; i++) {
|
||||
|
@ -59,6 +64,11 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(result.getPolicyName(), equalTo("majestic"));
|
||||
assertThat(result.getEnrichKey(), equalTo("host"));
|
||||
assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing));
|
||||
if (overrideEnabled != null) {
|
||||
assertThat(result.isOverrideEnabled(), is(overrideEnabled));
|
||||
} else {
|
||||
assertThat(result.isOverrideEnabled(), is(true));
|
||||
}
|
||||
assertThat(result.getSpecifications().size(), equalTo(numRandomValues));
|
||||
for (int i = 0; i < numRandomValues; i++) {
|
||||
EnrichSpecification actual = result.getSpecifications().get(i);
|
||||
|
|
|
@ -56,7 +56,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
documents.put("elastic.co", document1);
|
||||
}
|
||||
MockSearchFunction mockSearch = mockedSearchFunction(documents);
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
|
||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||
Collections.singletonMap("domain", "elastic.co"));
|
||||
|
@ -85,7 +85,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
|
||||
public void testNoMatch() throws Exception {
|
||||
MockSearchFunction mockSearch = mockedSearchFunction();
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
|
||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||
Collections.singletonMap("domain", "elastic.com"));
|
||||
|
@ -115,7 +115,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
public void testSearchFailure() throws Exception {
|
||||
String indexName = ".enrich-_name";
|
||||
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
|
||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||
Collections.singletonMap("domain", "elastic.com"));
|
||||
|
@ -150,7 +150,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
public void testIgnoreKeyMissing() throws Exception {
|
||||
{
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
|
||||
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
true, true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||
Collections.emptyMap());
|
||||
|
||||
|
@ -162,7 +162,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
}
|
||||
{
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
|
||||
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
false, true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||
Collections.emptyMap());
|
||||
IngestDocument[] resultHolder = new IngestDocument[1];
|
||||
|
@ -177,6 +177,43 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testExistingFieldWithOverrideDisabled() throws Exception {
|
||||
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
|
||||
Collections.singletonList(new EnrichSpecification("tld", "tld")));
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf());
|
||||
IngestDocument[] resultHolder = new IngestDocument[1];
|
||||
Exception[] exceptionHolder = new Exception[1];
|
||||
processor.execute(ingestDocument, (result, e) -> {
|
||||
resultHolder[0] = result;
|
||||
exceptionHolder[0] = e;
|
||||
});
|
||||
assertThat(exceptionHolder[0], nullValue());
|
||||
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
|
||||
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo("tld"));
|
||||
}
|
||||
|
||||
public void testExistingNullFieldWithOverrideDisabled() throws Exception {
|
||||
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
|
||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
|
||||
Collections.singletonList(new EnrichSpecification("tld", "tld")));
|
||||
|
||||
Map<String, Object> source = new HashMap<>();
|
||||
source.put("domain", "elastic.co");
|
||||
source.put("tld", null);
|
||||
IngestDocument ingestDocument = new IngestDocument(source, mapOf());
|
||||
IngestDocument[] resultHolder = new IngestDocument[1];
|
||||
Exception[] exceptionHolder = new Exception[1];
|
||||
processor.execute(ingestDocument, (result, e) -> {
|
||||
resultHolder[0] = result;
|
||||
exceptionHolder[0] = e;
|
||||
});
|
||||
assertThat(exceptionHolder[0], nullValue());
|
||||
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
|
||||
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo(null));
|
||||
}
|
||||
|
||||
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
|
||||
private final SearchResponse mockResponse;
|
||||
private final SetOnce<SearchRequest> capturedRequest;
|
||||
|
@ -240,4 +277,29 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
|||
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
|
||||
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
|
||||
}
|
||||
|
||||
static <K, V> Map<K, V> mapOf() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
static <K, V> Map<K, V> mapOf(K key1, V value1) {
|
||||
Map<K, V> map = new HashMap<>();
|
||||
map.put(key1, value1);
|
||||
return map;
|
||||
}
|
||||
|
||||
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2) {
|
||||
Map<K, V> map = new HashMap<>();
|
||||
map.put(key1, value1);
|
||||
map.put(key2, value2);
|
||||
return map;
|
||||
}
|
||||
|
||||
static Map<String, ?> mapOf(String key1, Object value1, String key2, Object value2, String key3, Object value3) {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put(key1, value1);
|
||||
map.put(key2, value2);
|
||||
map.put(key3, value3);
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue