Make enrich processor use search action through a client (#43311)

Add client to processor parameters in the ingest service.
Remove the search provider function from the processor parameters.
ExactMatchProcessor and Factory converted to use client.
Remove test cases that are no longer applicable from processor.
This commit is contained in:
James Baiera 2019-06-25 11:16:30 -04:00
parent 36f0e8a8bb
commit 1b902aa746
9 changed files with 226 additions and 624 deletions

View File

@ -30,14 +30,12 @@ import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -48,12 +46,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -95,10 +89,9 @@ public class IngestService implements ClusterStateApplier {
public IngestService(ClusterService clusterService, ThreadPool threadPool, public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins, IndicesService indicesService) { List<IngestPlugin> ingestPlugins, Client client) {
this.clusterService = clusterService; this.clusterService = clusterService;
this.scriptService = scriptService; this.scriptService = scriptService;
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
this.processorFactories = processorFactories( this.processorFactories = processorFactories(
ingestPlugins, ingestPlugins,
new Processor.Parameters( new Processor.Parameters(
@ -106,35 +99,7 @@ public class IngestService implements ClusterStateApplier {
threadPool.getThreadContext(), threadPool::relativeTimeInMillis, threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule( (delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this, indexExpression -> { ), this, client));
ClusterState state = clusterService.state();
Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression);
if (resolvedIndices.length != 1) {
throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index");
}
Index index = resolvedIndices[0];
// check if indexExpression matches with an alias that has a filter
// There is no guarantee that alias filters are applied, so fail if this is the case.
Set<String> indicesAndAliases = resolver.resolveExpressions(state, indexExpression);
String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases);
if (aliasesWithFilter != null && aliasesWithFilter.length > 0) {
throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter");
}
IndexService indexService = indicesService.indexServiceSafe(index);
int numShards = indexService.getMetaData().getNumberOfShards();
if (numShards != 1) {
throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards +
" shards");
}
IndexShard indexShard = indexService.getShard(0);
IndexMetaData imd = state.metaData().index(index);
return new Tuple<>(imd, indexShard.acquireSearcher("ingest"));
}
)
);
this.threadPool = threadPool; this.threadPool = threadPool;
} }

View File

@ -19,18 +19,15 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import java.util.Map; import java.util.Map;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
/** /**
@ -115,17 +112,13 @@ public interface Processor {
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler; public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;
/** /**
* Provides access to an engine searcher of a locally allocated index specified for the provided index. * Provides access to the client
* The input of this function is an index expression and this function returns the {@link IndexMetaData}
* of the resolved locally allocated index and {@link Engine.Searcher} instance for the resolved index.
*
* The locally allocated index must be have a single primary shard.
*/ */
public final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher; public final Client client;
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler, LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) { IngestService ingestService, Client client) {
this.env = env; this.env = env;
this.scriptService = scriptService; this.scriptService = scriptService;
this.threadContext = threadContext; this.threadContext = threadContext;
@ -133,7 +126,7 @@ public interface Processor {
this.relativeTimeSupplier = relativeTimeSupplier; this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler; this.scheduler = scheduler;
this.ingestService = ingestService; this.ingestService = ingestService;
this.localShardSearcher = localShardSearcher; this.client = client;
} }
} }

View File

@ -432,7 +432,7 @@ public class Node implements Closeable {
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), indicesService); pluginsService.filterPlugins(IngestPlugin.class), client);
final AliasValidator aliasValidator = new AliasValidator(); final AliasValidator aliasValidator = new AliasValidator();

View File

@ -1,271 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(TestPlugin.class);
}
public void testLocalShardSearcher() throws Exception {
client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet();
client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet();
client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet();
Map<String, Object> result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap();
assertThat(result.size(), equalTo(1));
assertThat(result.get("id"), equalTo("1"));
}
public void testMultipleIndicesAreResolved() throws Exception {
createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1")
.addAlias(new Alias("reference-index")));
createIndex("reference-index2", client().admin().indices().prepareCreate("reference-index2")
.addAlias(new Alias("reference-index")));
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getRootCause(), instanceOf(IllegalStateException.class));
assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] can only point to a single concrete index"));
}
public void testMoreThanOnePrimaryShard() throws Exception {
createIndex("reference-index", Settings.builder().put("index.number_of_shards", 2).build());
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getRootCause(), instanceOf(IllegalStateException.class));
assertThat(e.getRootCause().getMessage(), equalTo("index [reference-index] must have 1 shard, but has 2 shards"));
}
public void testFailWithFilteredAlias() throws Exception {
createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1")
.addAlias(new Alias("reference-index").filter(QueryBuilders.matchAllQuery())));
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getRootCause(), instanceOf(IllegalStateException.class));
assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] points an alias with a filter"));
}
public void testWithAlias() throws Exception {
createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1")
.addAlias(new Alias("reference-index")));
client().index(new IndexRequest("reference-index1").id("1").source("{}", XContentType.JSON)).actionGet();
client().admin().indices().refresh(new RefreshRequest("reference-index1")).actionGet();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet();
client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet();
Map<String, Object> result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap();
assertThat(result.size(), equalTo(1));
assertThat(result.get("id"), equalTo("1"));
}
public void testSearchWrapperIsApplied() throws Exception {
client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet();
client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet();
client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet();
Map<String, Object> result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap();
assertThat(result.size(), equalTo(1));
assertThat(result.get("id"), equalTo("1"));
}
private static BytesReference createPipelineSource() throws IOException {
return BytesReference.bytes(jsonBuilder().startObject()
.startArray("processors")
.startObject()
.startObject(TestProcessor.NAME)
.endObject()
.endObject()
.endArray()
.endObject());
}
public static class TestPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(TestProcessor.NAME, new TestProcessor.Factory(parameters.localShardSearcher));
}
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.setSearcherWrapper(indexService -> new IndexSearcherWrapper() {
@Override
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new TestDirectyReader(reader);
}
});
}
}
static class TestProcessor extends AbstractProcessor {
static final String NAME = "test_processor";
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
TestProcessor(String tag, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
super(tag);
this.localShardSearcher = localShardSearcher;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String indexExpression = "reference-index";
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression).v2()) {
// Ensure that search wrapper has been invoked by checking the directory instance type:
if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) {
// asserting or throwing a AssertionError makes this test hang:
// so just throw a runtime exception here:
throw new RuntimeException("unexpected directory instance type");
}
Document document = engineSearcher.searcher().doc(0);
ingestDocument.setFieldValue("id", Uid.decodeId(document.getBinaryValue("_id").bytes));
}
return ingestDocument;
}
@Override
public String getType() {
return NAME;
}
static class Factory implements Processor.Factory {
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
Factory(Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
this.localShardSearcher = localShardSearcher;
}
@Override
public Processor create(Map<String, Processor.Factory> processorFactories,
String tag, Map<String, Object> config) throws Exception {
return new TestProcessor(tag, localShardSearcher);
}
}
}
static class TestDirectyReader extends FilterDirectoryReader {
TestDirectyReader(DirectoryReader in) throws IOException {
super(in, new TestSubReaderWrapper());
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new TestDirectyReader(in);
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
static class TestSubReaderWrapper extends SubReaderWrapper {
@Override
public LeafReader wrap(LeafReader reader) {
return new TestLeafReader(reader);
}
}
static class TestLeafReader extends FilterLeafReader {
TestLeafReader(LeafReader in) {
super(in);
}
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
}
}
}

View File

@ -1140,7 +1140,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
new IngestService( new IngestService(
clusterService, threadPool, environment, scriptService, clusterService, threadPool, environment, scriptService,
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList(), null), Collections.emptyList(), client),
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
)); ));

View File

@ -72,7 +72,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
@Override @Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) { public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.localShardSearcher); EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client);
parameters.ingestService.addIngestClusterStateListener(factory); parameters.ingestService.addIngestClusterStateListener(factory);
return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory); return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory);
} }

View File

@ -5,10 +5,8 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.Processor;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@ -17,17 +15,16 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
final class EnrichProcessorFactory implements Processor.Factory, Consumer<ClusterState> { final class EnrichProcessorFactory implements Processor.Factory, Consumer<ClusterState> {
static final String TYPE = "enrich"; static final String TYPE = "enrich";
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider; private final Client client;
volatile Map<String, EnrichPolicy> policies = Collections.emptyMap(); volatile Map<String, EnrichPolicy> policies = Collections.emptyMap();
EnrichProcessorFactory(Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider) { EnrichProcessorFactory(Client client) {
this.searchProvider = searchProvider; this.client = client;
} }
@Override @Override
@ -57,7 +54,7 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
switch (policy.getType()) { switch (policy.getType()) {
case EnrichPolicy.EXACT_MATCH_TYPE: case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, searchProvider, policyName, enrichKey, ignoreMissing, specifications); return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, specifications);
default: default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]"); throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
} }

View File

@ -5,35 +5,28 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.apache.lucene.document.Document; import org.elasticsearch.action.search.SearchRequest;
import org.apache.lucene.index.LeafReader; import org.elasticsearch.action.search.SearchResponse;
import org.apache.lucene.index.PostingsEnum; import org.elasticsearch.client.Client;
import org.apache.lucene.index.Terms; import org.elasticsearch.cluster.routing.Preference;
import org.apache.lucene.index.TermsEnum; import org.elasticsearch.common.CheckedFunction;
import org.apache.lucene.util.BytesRef; import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
final class ExactMatchProcessor extends AbstractProcessor { final class ExactMatchProcessor extends AbstractProcessor {
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field"; static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider; private final CheckedFunction<SearchRequest, SearchResponse, Exception> searchRunner;
private final String policyName; private final String policyName;
private final String enrichKey; private final String enrichKey;
@ -41,13 +34,22 @@ final class ExactMatchProcessor extends AbstractProcessor {
private final List<EnrichSpecification> specifications; private final List<EnrichSpecification> specifications;
ExactMatchProcessor(String tag, ExactMatchProcessor(String tag,
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider, Client client,
String policyName,
String enrichKey,
boolean ignoreMissing,
List<EnrichSpecification> specifications) {
this(tag, (req) -> client.search(req).actionGet(), policyName, enrichKey, ignoreMissing, specifications);
}
ExactMatchProcessor(String tag,
CheckedFunction<SearchRequest, SearchResponse, Exception> searchRunner,
String policyName, String policyName,
String enrichKey, String enrichKey,
boolean ignoreMissing, boolean ignoreMissing,
List<EnrichSpecification> specifications) { List<EnrichSpecification> specifications) {
super(tag); super(tag);
this.searchProvider = searchProvider; this.searchRunner = searchRunner;
this.policyName = policyName; this.policyName = policyName;
this.enrichKey = enrichKey; this.enrichKey = enrichKey;
this.ignoreMissing = ignoreMissing; this.ignoreMissing = ignoreMissing;
@ -56,49 +58,45 @@ final class ExactMatchProcessor extends AbstractProcessor {
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
// If a document does not have the enrich key, return the unchanged document
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
if (value == null) { if (value == null) {
return ingestDocument; return ingestDocument;
} }
// TODO: re-use the engine searcher between enriching documents from the same write request TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value);
Tuple<IndexMetaData, Engine.Searcher> tuple = searchProvider.apply(EnrichPolicy.getBaseName(policyName)); ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
String enrichKeyField = getEnrichKeyField(tuple.v1()); // TODO: Use a custom transport action instead of the search API
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.size(1);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null);
searchBuilder.query(constantScore);
try (Engine.Searcher engineSearcher = tuple.v2()) { SearchRequest req = new SearchRequest();
if (engineSearcher.getDirectoryReader().leaves().size() == 0) { req.indices(EnrichPolicy.getBaseName(policyName));
return ingestDocument; req.preference(Preference.LOCAL.type());
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) { req.source(searchBuilder);
throw new IllegalStateException("enrich index must have exactly a single segment");
}
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader(); // TODO: Make this Async
final Terms terms = leafReader.terms(enrichKeyField); SearchResponse searchResponse = searchRunner.apply(req);
if (terms == null) {
throw new IllegalStateException("enrich key field does not exist");
}
final TermsEnum tenum = terms.iterator(); // If the index is empty, return the unchanged document
if (tenum.seekExact(new BytesRef(value))) { // If the enrich key does not exist in the index, throw an error
PostingsEnum penum = tenum.postings(null, PostingsEnum.NONE); // If no documents match the key, return the unchanged document
final int docId = penum.nextDoc(); SearchHit[] searchHits = searchResponse.getHits().getHits();
assert docId != PostingsEnum.NO_MORE_DOCS : "no matching doc id for [" + enrichKey + "]"; if (searchHits.length < 1) {
assert penum.nextDoc() == PostingsEnum.NO_MORE_DOCS : "more than one doc id matching for [" + enrichKey + "]"; return ingestDocument;
} else if (searchHits.length > 1) {
throw new IllegalStateException("more than one doc id matching for [" + enrichKey + "]");
}
// TODO: The use of _source is temporarily until enrich source field mapper has been added (see PR #41521) // If a document is returned, add its fields to the document
Document document = leafReader.document(docId, Collections.singleton(SourceFieldMapper.NAME)); Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
BytesRef source = document.getBinaryValue(SourceFieldMapper.NAME); assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
assert source != null; for (EnrichSpecification specification : specifications) {
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
final BytesReference encoded = new BytesArray(source); ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
final Map<String, Object> decoded =
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2();
for (EnrichSpecification specification : specifications) {
Object enrichValue = decoded.get(specification.sourceField);
// TODO: add support over overwrite option (like in SetProcessor)
ingestDocument.setFieldValue(specification.targetField, enrichValue);
}
}
} }
return ingestDocument; return ingestDocument;
} }
@ -123,22 +121,4 @@ final class ExactMatchProcessor extends AbstractProcessor {
List<EnrichSpecification> getSpecifications() { List<EnrichSpecification> getSpecifications() {
return specifications; return specifications;
} }
private static String getEnrichKeyField(IndexMetaData imd) {
if (imd == null) {
throw new IllegalStateException("enrich index is missing");
}
Map<String, Object> mappingSource = imd.mapping().getSourceAsMap();
Map<?, ?> meta = (Map<?, ?>) mappingSource.get("_meta");
if (meta == null) {
throw new IllegalStateException("_meta field is missing in enrich index");
}
String fieldName = (String) meta.get(ENRICH_KEY_FIELD_NAME);
if (fieldName == null) {
throw new IllegalStateException("enrich key fieldname missing");
}
return fieldName;
}
} }

View File

@ -5,278 +5,216 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.search.TotalHits;
import org.apache.lucene.document.Document; import org.apache.lucene.util.SetOnce;
import org.apache.lucene.document.Field; import org.elasticsearch.action.search.SearchRequest;
import org.apache.lucene.document.StoredField; import org.elasticsearch.action.search.SearchResponse;
import org.apache.lucene.document.StringField; import org.elasticsearch.action.search.SearchResponseSections;
import org.apache.lucene.index.DirectoryReader; import org.elasticsearch.action.search.ShardSearchFailure;
import org.apache.lucene.index.IndexReader; import org.elasticsearch.cluster.routing.Preference;
import org.apache.lucene.index.IndexWriter; import org.elasticsearch.common.CheckedFunction;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME; import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
public class ExactMatchProcessorTests extends ESTestCase { public class ExactMatchProcessorTests extends ESTestCase {
public void testBasics() throws Exception { public void testBasics() throws Exception {
try (Directory directory = newDirectory()) { Map<String, Map<String, ?>> documents = new HashMap<>();
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); {
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); Map<String, Object> document1 = new HashMap<>();
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { document1.put("globalRank", 451);
indexWriter.addDocument(createEnrichDocument("google.com", "globalRank", 1, "tldRank", 1, "tld", "com")); document1.put("tldRank",23);
indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co")); document1.put("tld", "co");
indexWriter.addDocument(createEnrichDocument("bbc.co.uk", "globalRank", 45, "tldRank", 14, "tld", "co.uk")); documents.put("elastic.co", document1);
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
indexWriter.commit();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23));
assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co"));
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "eops.nl"));
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(80));
assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("nl"));
}
}
} }
MockSearchFunction mockSearch = mockedSearchFunction(documents);
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
// Run
assertThat(processor.execute(ingestDocument), notNullValue());
// Check request
SearchRequest request = mockSearch.getCapturedRequest();
assertThat(request.indices().length, equalTo(1));
assertThat(request.indices()[0], equalTo(".enrich-_name"));
assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
assertThat(request.source().size(), equalTo(1));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld"));
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class));
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
assertThat(termQueryBuilder.value(), equalTo("elastic.co"));
// Check result
assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23));
assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co"));
} }
public void testNoMatch() throws Exception { public void testNoMatch() throws Exception {
try (Directory directory = newDirectory()) { MockSearchFunction mockSearch = mockedSearchFunction();
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.addDocument(createEnrichDocument("google.com", "globalRank", 1, "tldRank", 1, "tld", "com"));
indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co"));
indexWriter.addDocument(createEnrichDocument("bbc.co.uk", "globalRank", 45, "tldRank", 14, "tld", "co.uk"));
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
indexWriter.commit();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
int numProperties = ingestDocument.getSourceAndMetadata().size();
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties));
}
}
}
}
public void testMoreThanOneSegment() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co"));
indexWriter.commit();
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
indexWriter.commit();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("enrich index must have exactly a single segment"));
}
}
}
}
public void testEmptyIndex() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.commit();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
int numProperties = ingestDocument.getSourceAndMetadata().size();
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties));
}
}
}
}
public void testEnrichKeyFieldMissing() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
Document document = new Document();
document.add(new StringField("different_key", "elastic.co", Field.Store.NO));
indexWriter.addDocument(document);
indexWriter.commit();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("enrich key field does not exist"));
}
}
}
}
public void testIndexMetadataMissing() {
Function<String, Tuple<IndexMetaData, Engine.Searcher>> provider = indexExpression -> new Tuple<>(null, null);
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co")); Collections.singletonMap("domain", "elastic.com"));
expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); int numProperties = ingestDocument.getSourceAndMetadata().size();
// Run
assertThat(processor.execute(ingestDocument), notNullValue());
// Check request
SearchRequest request = mockSearch.getCapturedRequest();
assertThat(request.indices().length, equalTo(1));
assertThat(request.indices()[0], equalTo(".enrich-_name"));
assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
assertThat(request.source().size(), equalTo(1));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld"));
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class));
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
assertThat(termQueryBuilder.value(), equalTo("elastic.com"));
// Check result
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties));
} }
public void testMetaFieldMissing() throws Exception { public void testSearchFailure() throws Exception {
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) String indexName = ".enrich-_name";
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
IndexMetaData imd = IndexMetaData.builder("majestic_index")
.settings(indexSettings)
.putMapping("_doc", "{}")
.build();
Function<String, Tuple<IndexMetaData, Engine.Searcher>> provider = indexExpression -> new Tuple<>(imd, null);
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co")); Collections.singletonMap("domain", "elastic.com"));
expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); // Run
} IndexNotFoundException expectedException = expectThrows(IndexNotFoundException.class, () -> processor.execute(ingestDocument));
// Check request
public void testEnrichKeyFieldNameMissing() throws Exception { SearchRequest request = mockSearch.getCapturedRequest();
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) assertThat(request.indices().length, equalTo(1));
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) assertThat(request.indices()[0], equalTo(".enrich-_name"));
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
IndexMetaData imd = IndexMetaData.builder("majestic_index") assertThat(request.source().size(), equalTo(1));
.settings(indexSettings) assertThat(request.source().trackScores(), equalTo(false));
.putMapping("_doc", "{\"_meta\": {}}") assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
.build(); assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld"));
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
Function<String, Tuple<IndexMetaData, Engine.Searcher>> provider = indexExpression -> new Tuple<>(imd, null); assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
assertThat(termQueryBuilder.value(), equalTo("elastic.com"));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, // Check result
Collections.singletonMap("domain", "elastic.co")); assertThat(expectedException.getMessage(), equalTo("no such index [" + indexName + "]"));
expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
} }
public void testIgnoreKeyMissing() throws Exception { public void testIgnoreKeyMissing() throws Exception {
{ {
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", ndexExpression -> null, "_name", "domain", ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); Collections.emptyMap());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
assertThat(processor.execute(ingestDocument), notNullValue()); assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
} }
{ {
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", indexExpression -> null, "_name", "domain", ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); Collections.emptyMap());
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
} }
} }
private static Document createEnrichDocument(String key, Object... decorateValues) throws IOException { private static final class MockSearchFunction implements CheckedFunction<SearchRequest, SearchResponse, Exception> {
assert decorateValues.length % 2 ==0; private final SearchResponse mockResponse;
private final SetOnce<SearchRequest> capturedRequest;
private final Exception exception;
BytesReference decorateContent; MockSearchFunction(SearchResponse mockResponse) {
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { this.mockResponse = mockResponse;
Map<String, Object> map = new HashMap<>(); this.exception = null;
for (int i = 0; i < decorateValues.length; i += 2) { this.capturedRequest = new SetOnce<>();
map.put((String) decorateValues[i], decorateValues[i + 1]); }
}
builder.map(map); MockSearchFunction(Exception exception) {
builder.flush(); this.mockResponse = null;
ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); this.exception = exception;
decorateContent = new BytesArray(outputStream.toByteArray()); this.capturedRequest = new SetOnce<>();
}
@Override
public SearchResponse apply(SearchRequest request) throws Exception {
capturedRequest.set(request);
if (exception != null) {
throw exception;
} else {
return mockResponse;
}
}
SearchRequest getCapturedRequest() {
return capturedRequest.get();
} }
Document document = new Document();
document.add(new StringField("key", key, Field.Store.NO));
document.add(new StoredField(SourceFieldMapper.NAME, decorateContent.toBytesRef()));
return document;
} }
private static Function<String, Tuple<IndexMetaData, Engine.Searcher>> createSearchProvider(IndexReader indexReader) throws Exception { public MockSearchFunction mockedSearchFunction() {
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) return new MockSearchFunction(mockResponse(Collections.emptyMap()));
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) }
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexMetaData imd = IndexMetaData.builder("majestic_index")
.settings(indexSettings)
.putMapping("_doc", "{\"_meta\": {\"" + ENRICH_KEY_FIELD_NAME +"\": \"key\"}}")
.build();
IndexSearcher indexSearcher = new IndexSearcher(indexReader); public MockSearchFunction mockedSearchFunction(Exception exception) {
Engine.Searcher searcher = new Engine.Searcher("_enrich", indexSearcher, indexReader); return new MockSearchFunction(exception);
return indexExpression -> new Tuple<>(imd, searcher); }
public MockSearchFunction mockedSearchFunction(Map<String, Map<String, ?>> documents) {
return new MockSearchFunction(mockResponse(documents));
}
public SearchResponse mockResponse(Map<String, Map<String, ?>> documents) {
SearchHit[] searchHits = documents.entrySet().stream().map(e -> {
SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap());
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) {
builder.map(e.getValue());
builder.flush();
ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream();
searchHit.sourceRef(new BytesArray(outputStream.toByteArray()));
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
return searchHit;
}).toArray(SearchHit[]::new);
return new SearchResponse(new SearchResponseSections(
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
} }
} }