Keep track of the enrich key field in the enrich index. (#42022)
The enrich key field is being kept track in _meta field by the policy runner. The ingest processor uses the field name defined in enrich index _meta field and not in the policy. This will avoid problems if policy is changed without a new enrich index being created. This also complete decouples EnrichPolicy from ExactMatchProcessor. The following scenario results in failure without this change: 1) Create policy 2) Execute policy 3) Create pipeline with enrich processor 4) Use pipeline 5) Update enrich key in policy 6) Use pipeline, which then fails.
This commit is contained in:
parent
299ff70bfe
commit
57a4614a7b
|
@ -36,6 +36,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -127,7 +128,8 @@ public class IngestService implements ClusterStateApplier {
|
||||||
}
|
}
|
||||||
|
|
||||||
IndexShard indexShard = indexService.getShard(0);
|
IndexShard indexShard = indexService.getShard(0);
|
||||||
return indexShard.acquireSearcher("ingest");
|
IndexMetaData imd = state.metaData().index(index);
|
||||||
|
return new Tuple<>(imd, indexShard.acquireSearcher("ingest"));
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||||
|
@ -114,14 +116,16 @@ public interface Processor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
|
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
|
||||||
|
* The input of this function is an index expression and this function returns the {@link IndexMetaData}
|
||||||
|
* of the resolved locally allocated index and {@link Engine.Searcher} instance for the resolved index.
|
||||||
*
|
*
|
||||||
* The locally allocated index must be have a single primary shard.
|
* The locally allocated index must be have a single primary shard.
|
||||||
*/
|
*/
|
||||||
public final Function<String, Engine.Searcher> localShardSearcher;
|
public final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
|
||||||
|
|
||||||
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
|
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
|
||||||
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
|
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
|
||||||
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
|
IngestService ingestService, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
|
||||||
this.env = env;
|
this.env = env;
|
||||||
this.scriptService = scriptService;
|
this.scriptService = scriptService;
|
||||||
this.threadContext = threadContext;
|
this.threadContext = threadContext;
|
||||||
|
|
|
@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||||
import org.elasticsearch.action.get.GetRequest;
|
import org.elasticsearch.action.get.GetRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
|
@ -180,9 +182,9 @@ public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
static final String NAME = "test_processor";
|
static final String NAME = "test_processor";
|
||||||
|
|
||||||
private final Function<String, Engine.Searcher> localShardSearcher;
|
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
|
||||||
|
|
||||||
TestProcessor(String tag, Function<String, Engine.Searcher> localShardSearcher) {
|
TestProcessor(String tag, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
|
||||||
super(tag);
|
super(tag);
|
||||||
this.localShardSearcher = localShardSearcher;
|
this.localShardSearcher = localShardSearcher;
|
||||||
}
|
}
|
||||||
|
@ -190,7 +192,7 @@ public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase {
|
||||||
@Override
|
@Override
|
||||||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
||||||
String indexExpression = "reference-index";
|
String indexExpression = "reference-index";
|
||||||
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) {
|
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression).v2()) {
|
||||||
// Ensure that search wrapper has been invoked by checking the directory instance type:
|
// Ensure that search wrapper has been invoked by checking the directory instance type:
|
||||||
if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) {
|
if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) {
|
||||||
// asserting or throwing a AssertionError makes this test hang:
|
// asserting or throwing a AssertionError makes this test hang:
|
||||||
|
@ -210,9 +212,9 @@ public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
static class Factory implements Processor.Factory {
|
static class Factory implements Processor.Factory {
|
||||||
|
|
||||||
private final Function<String, Engine.Searcher> localShardSearcher;
|
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
|
||||||
|
|
||||||
Factory(Function<String, Engine.Searcher> localShardSearcher) {
|
Factory(Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
|
||||||
this.localShardSearcher = localShardSearcher;
|
this.localShardSearcher = localShardSearcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -132,7 +132,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
|
||||||
return schedule;
|
return schedule;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getBaseName(String policyName) {
|
public static String getBaseName(String policyName) {
|
||||||
return ENRICH_INDEX_NAME_BASE + policyName;
|
return ENRICH_INDEX_NAME_BASE + policyName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.http.entity.ContentType;
|
||||||
import org.apache.http.util.EntityUtils;
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.elasticsearch.client.Request;
|
import org.elasticsearch.client.Request;
|
||||||
import org.elasticsearch.client.Response;
|
import org.elasticsearch.client.Response;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
@ -45,6 +46,10 @@ public class EnrichIT extends ESRestTestCase {
|
||||||
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}");
|
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}");
|
||||||
assertOK(client().performRequest(putPolicyRequest));
|
assertOK(client().performRequest(putPolicyRequest));
|
||||||
|
|
||||||
|
// create index (remove when execute policy api has been added)
|
||||||
|
String mapping = "\"_meta\": {\"enrich_key_field\": \"host\"}";
|
||||||
|
createIndex(".enrich-my_policy", Settings.EMPTY, mapping);
|
||||||
|
|
||||||
// Add a single enrich document for now and then refresh:
|
// Add a single enrich document for now and then refresh:
|
||||||
Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co");
|
Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co");
|
||||||
XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent());
|
XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent());
|
||||||
|
|
|
@ -45,6 +45,8 @@ import org.elasticsearch.index.reindex.ReindexRequest;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||||
|
|
||||||
|
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
|
||||||
|
|
||||||
public class EnrichPolicyRunner implements Runnable {
|
public class EnrichPolicyRunner implements Runnable {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
|
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
|
||||||
|
@ -145,6 +147,9 @@ public class EnrichPolicyRunner implements Runnable {
|
||||||
.field("doc_values", false)
|
.field("doc_values", false)
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject()
|
.endObject()
|
||||||
|
.startObject("_meta")
|
||||||
|
.field(ENRICH_KEY_FIELD_NAME, policy.getEnrichKey())
|
||||||
|
.endObject()
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject();
|
.endObject();
|
||||||
|
|
||||||
|
@ -156,7 +161,7 @@ public class EnrichPolicyRunner implements Runnable {
|
||||||
|
|
||||||
private void prepareAndCreateEnrichIndex() {
|
private void prepareAndCreateEnrichIndex() {
|
||||||
long nowTimestamp = nowSupplier.getAsLong();
|
long nowTimestamp = nowSupplier.getAsLong();
|
||||||
String enrichIndexName = policy.getBaseName(policyName) + "-" + nowTimestamp;
|
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
|
||||||
Settings enrichIndexSettings = Settings.builder()
|
Settings enrichIndexSettings = Settings.builder()
|
||||||
.put("index.auto_expand_replicas", "0-all")
|
.put("index.auto_expand_replicas", "0-all")
|
||||||
.build();
|
.build();
|
||||||
|
@ -231,7 +236,7 @@ public class EnrichPolicyRunner implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateEnrichPolicyAlias(final String destinationIndexName) {
|
private void updateEnrichPolicyAlias(final String destinationIndexName) {
|
||||||
String enrichIndexBase = policy.getBaseName(policyName);
|
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
|
||||||
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
|
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
|
||||||
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
|
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
|
||||||
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);
|
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);
|
||||||
|
|
|
@ -6,6 +6,8 @@
|
||||||
package org.elasticsearch.xpack.enrich;
|
package org.elasticsearch.xpack.enrich;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.ingest.ConfigurationUtils;
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
import org.elasticsearch.ingest.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
@ -22,10 +24,10 @@ final class EnrichProcessorFactory implements Processor.Factory {
|
||||||
static final String TYPE = "enrich";
|
static final String TYPE = "enrich";
|
||||||
|
|
||||||
private final Function<String, EnrichPolicy> policyLookup;
|
private final Function<String, EnrichPolicy> policyLookup;
|
||||||
private final Function<String, Engine.Searcher> searchProvider;
|
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;
|
||||||
|
|
||||||
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
|
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
|
||||||
Function<String, Engine.Searcher> searchProvider) {
|
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider) {
|
||||||
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
|
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
|
||||||
this.searchProvider = searchProvider;
|
this.searchProvider = searchProvider;
|
||||||
}
|
}
|
||||||
|
@ -57,7 +59,7 @@ final class EnrichProcessorFactory implements Processor.Factory {
|
||||||
|
|
||||||
switch (policy.getType()) {
|
switch (policy.getType()) {
|
||||||
case EnrichPolicy.EXACT_MATCH_TYPE:
|
case EnrichPolicy.EXACT_MATCH_TYPE:
|
||||||
return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
|
return new ExactMatchProcessor(tag, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
|
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,8 +11,10 @@ import org.apache.lucene.index.PostingsEnum;
|
||||||
import org.apache.lucene.index.Terms;
|
import org.apache.lucene.index.Terms;
|
||||||
import org.apache.lucene.index.TermsEnum;
|
import org.apache.lucene.index.TermsEnum;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
@ -29,8 +31,9 @@ import java.util.function.Function;
|
||||||
|
|
||||||
final class ExactMatchProcessor extends AbstractProcessor {
|
final class ExactMatchProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
private final Function<String, EnrichPolicy> policyLookup;
|
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
|
||||||
private final Function<String, Engine.Searcher> searchProvider;
|
|
||||||
|
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;
|
||||||
|
|
||||||
private final String policyName;
|
private final String policyName;
|
||||||
private final String enrichKey;
|
private final String enrichKey;
|
||||||
|
@ -38,14 +41,12 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
||||||
private final List<EnrichSpecification> specifications;
|
private final List<EnrichSpecification> specifications;
|
||||||
|
|
||||||
ExactMatchProcessor(String tag,
|
ExactMatchProcessor(String tag,
|
||||||
Function<String, EnrichPolicy> policyLookup,
|
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider,
|
||||||
Function<String, Engine.Searcher> searchProvider,
|
|
||||||
String policyName,
|
String policyName,
|
||||||
String enrichKey,
|
String enrichKey,
|
||||||
boolean ignoreMissing,
|
boolean ignoreMissing,
|
||||||
List<EnrichSpecification> specifications) {
|
List<EnrichSpecification> specifications) {
|
||||||
super(tag);
|
super(tag);
|
||||||
this.policyLookup = policyLookup;
|
|
||||||
this.searchProvider = searchProvider;
|
this.searchProvider = searchProvider;
|
||||||
this.policyName = policyName;
|
this.policyName = policyName;
|
||||||
this.enrichKey = enrichKey;
|
this.enrichKey = enrichKey;
|
||||||
|
@ -55,18 +56,16 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
||||||
final EnrichPolicy policy = policyLookup.apply(policyName);
|
|
||||||
if (policy == null) {
|
|
||||||
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
|
|
||||||
}
|
|
||||||
|
|
||||||
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
|
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return ingestDocument;
|
return ingestDocument;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: re-use the engine searcher between enriching documents from the same write request
|
// TODO: re-use the engine searcher between enriching documents from the same write request
|
||||||
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getBaseName(policyName))) {
|
Tuple<IndexMetaData, Engine.Searcher> tuple = searchProvider.apply(EnrichPolicy.getBaseName(policyName));
|
||||||
|
String enrichKeyField = getEnrichKeyField(tuple.v1());
|
||||||
|
|
||||||
|
try (Engine.Searcher engineSearcher = tuple.v2()) {
|
||||||
if (engineSearcher.getDirectoryReader().leaves().size() == 0) {
|
if (engineSearcher.getDirectoryReader().leaves().size() == 0) {
|
||||||
return ingestDocument;
|
return ingestDocument;
|
||||||
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) {
|
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) {
|
||||||
|
@ -74,9 +73,9 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
|
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
|
||||||
final Terms terms = leafReader.terms(policy.getEnrichKey());
|
final Terms terms = leafReader.terms(enrichKeyField);
|
||||||
if (terms == null) {
|
if (terms == null) {
|
||||||
throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist");
|
throw new IllegalStateException("enrich key field does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
final TermsEnum tenum = terms.iterator();
|
final TermsEnum tenum = terms.iterator();
|
||||||
|
@ -124,4 +123,22 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
||||||
List<EnrichSpecification> getSpecifications() {
|
List<EnrichSpecification> getSpecifications() {
|
||||||
return specifications;
|
return specifications;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String getEnrichKeyField(IndexMetaData imd) {
|
||||||
|
if (imd == null) {
|
||||||
|
throw new IllegalStateException("enrich index is missing");
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> mappingSource = imd.mapping().getSourceAsMap();
|
||||||
|
Map<?, ?> meta = (Map<?, ?>) mappingSource.get("_meta");
|
||||||
|
if (meta == null) {
|
||||||
|
throw new IllegalStateException("_meta field is missing in enrich index");
|
||||||
|
}
|
||||||
|
|
||||||
|
String fieldName = (String) meta.get(ENRICH_KEY_FIELD_NAME);
|
||||||
|
if (fieldName == null) {
|
||||||
|
throw new IllegalStateException("enrich key fieldname missing");
|
||||||
|
}
|
||||||
|
return fieldName;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,12 @@ import org.apache.lucene.index.IndexWriterConfig;
|
||||||
import org.apache.lucene.index.NoMergePolicy;
|
import org.apache.lucene.index.NoMergePolicy;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
|
@ -26,7 +30,6 @@ import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
|
||||||
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
|
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -37,6 +40,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
|
@ -53,16 +57,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
|
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
|
||||||
indexWriter.commit();
|
indexWriter.commit();
|
||||||
|
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
|
|
||||||
Collections.singletonList("majestic_index"), "key", Collections.emptyList(), "schedule");
|
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
|
|
||||||
|
|
||||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
|
||||||
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
|
||||||
|
|
||||||
ExactMatchProcessor processor =
|
ExactMatchProcessor processor =
|
||||||
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
|
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
|
||||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
@ -73,11 +70,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
|
||||||
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
|
||||||
|
|
||||||
ExactMatchProcessor processor =
|
ExactMatchProcessor processor =
|
||||||
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
|
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
|
||||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
@ -101,16 +95,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
|
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
|
||||||
indexWriter.commit();
|
indexWriter.commit();
|
||||||
|
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"),
|
|
||||||
"key", Collections.emptyList(), "schedule");
|
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
|
|
||||||
|
|
||||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
|
||||||
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
|
||||||
|
|
||||||
ExactMatchProcessor processor =
|
ExactMatchProcessor processor =
|
||||||
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
|
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
|
||||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
@ -133,16 +120,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
|
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
|
||||||
indexWriter.commit();
|
indexWriter.commit();
|
||||||
|
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"),
|
|
||||||
"key", Collections.emptyList(), "schedule");
|
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
|
|
||||||
|
|
||||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
|
||||||
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
|
||||||
|
|
||||||
ExactMatchProcessor processor =
|
ExactMatchProcessor processor =
|
||||||
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
|
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
|
||||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
@ -161,16 +141,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
|
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
|
||||||
indexWriter.commit();
|
indexWriter.commit();
|
||||||
|
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"),
|
|
||||||
"key", Collections.emptyList(), "schedule");
|
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
|
|
||||||
|
|
||||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
|
||||||
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
|
||||||
|
|
||||||
ExactMatchProcessor processor =
|
ExactMatchProcessor processor =
|
||||||
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
|
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
|
||||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
@ -193,43 +166,69 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
indexWriter.addDocument(document);
|
indexWriter.addDocument(document);
|
||||||
indexWriter.commit();
|
indexWriter.commit();
|
||||||
|
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"),
|
|
||||||
"key", Collections.emptyList(), "schedule");
|
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
|
|
||||||
|
|
||||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
|
||||||
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
|
||||||
|
|
||||||
ExactMatchProcessor processor =
|
ExactMatchProcessor processor =
|
||||||
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
|
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
|
||||||
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
Collections.singletonMap("domain", "elastic.co"));
|
Collections.singletonMap("domain", "elastic.co"));
|
||||||
Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
|
Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
|
||||||
assertThat(e.getMessage(), equalTo("enrich key field [key] does not exist"));
|
assertThat(e.getMessage(), equalTo("enrich key field does not exist"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPolicyMissing() {
|
public void testIndexMetadataMissing() {
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> null;
|
Function<String, Tuple<IndexMetaData, Engine.Searcher>> provider = indexExpression -> new Tuple<>(null, null);
|
||||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain",
|
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false,
|
||||||
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
IngestDocument ingestDocument =
|
|
||||||
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
|
Collections.singletonMap("domain", "elastic.co"));
|
||||||
|
expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMetaFieldMissing() throws Exception {
|
||||||
|
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||||
|
IndexMetaData imd = IndexMetaData.builder("majestic_index")
|
||||||
|
.settings(indexSettings)
|
||||||
|
.putMapping("_doc", "{}")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Function<String, Tuple<IndexMetaData, Engine.Searcher>> provider = indexExpression -> new Tuple<>(imd, null);
|
||||||
|
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false,
|
||||||
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
Collections.singletonMap("domain", "elastic.co"));
|
||||||
|
expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEnrichKeyFieldNameMissing() throws Exception {
|
||||||
|
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||||
|
IndexMetaData imd = IndexMetaData.builder("majestic_index")
|
||||||
|
.settings(indexSettings)
|
||||||
|
.putMapping("_doc", "{\"_meta\": {}}")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Function<String, Tuple<IndexMetaData, Engine.Searcher>> provider = indexExpression -> new Tuple<>(imd, null);
|
||||||
|
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false,
|
||||||
|
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
|
|
||||||
|
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
|
||||||
|
Collections.singletonMap("domain", "elastic.co"));
|
||||||
|
expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testIgnoreKeyMissing() throws Exception {
|
public void testIgnoreKeyMissing() throws Exception {
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"), "key",
|
|
||||||
Collections.emptyList(),
|
|
||||||
"schedule");
|
|
||||||
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
|
|
||||||
{
|
{
|
||||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain",
|
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", ndexExpression -> null, "_name", "domain",
|
||||||
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
IngestDocument ingestDocument =
|
IngestDocument ingestDocument =
|
||||||
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
|
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
|
||||||
|
@ -239,7 +238,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
|
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain",
|
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", indexExpression -> null, "_name", "domain",
|
||||||
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
|
||||||
IngestDocument ingestDocument =
|
IngestDocument ingestDocument =
|
||||||
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
|
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
|
||||||
|
@ -267,4 +266,17 @@ public class ExactMatchProcessorTests extends ESTestCase {
|
||||||
return document;
|
return document;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Function<String, Tuple<IndexMetaData, Engine.Searcher>> createSearchProvider(IndexReader indexReader) throws Exception {
|
||||||
|
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||||
|
IndexMetaData imd = IndexMetaData.builder("majestic_index")
|
||||||
|
.settings(indexSettings)
|
||||||
|
.putMapping("_doc", "{\"_meta\": {\"" + ENRICH_KEY_FIELD_NAME +"\": \"key\"}}")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
||||||
|
Engine.Searcher searcher = new Engine.Searcher("_enrich", indexSearcher, indexReader);
|
||||||
|
return indexExpression -> new Tuple<>(imd, searcher);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue