diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index c7083e7bbb6..914ca719332 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -19,12 +20,17 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; @@ -36,6 +42,7 @@ import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -95,6 +102,16 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { ); } + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry, Environment environment, + NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { + EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(clusterService, client, threadPool, + new IndexNameExpressionResolver(), System::currentTimeMillis); + return Collections.singleton(enrichPolicyExecutor); + } + @Override public List getNamedWriteables() { return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java new file mode 100644 index 00000000000..369729d4f41 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.enrich; + +import java.util.function.LongSupplier; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; + +class EnrichPolicyExecutor { + + private final ClusterService clusterService; + private final Client client; + private final ThreadPool threadPool; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final LongSupplier nowSupplier; + + EnrichPolicyExecutor(ClusterService clusterService, Client client, ThreadPool threadPool, + IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) { + this.clusterService = clusterService; + this.client = client; + this.threadPool = threadPool; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.nowSupplier = nowSupplier; + } + + public void runPolicy(String policyId, ActionListener listener) { + // Look up policy in policy store and execute it + EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state()); + if (policy == null) { + throw new ElasticsearchException("Policy execution failed. Could not locate policy with id [{}]", policyId); + } else { + runPolicy(policyId, policy, listener); + } + } + + public void runPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { + EnrichPolicyRunner runnable = + new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier); + threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java new file mode 100644 index 00000000000..e39e19c5fcd --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -0,0 +1,262 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.LongSupplier; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; + +public class EnrichPolicyRunner implements Runnable { + + private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class); + + private static final String ENRICH_INDEX_NAME_BASE = ".enrich-"; + + private final String policyName; + private final EnrichPolicy policy; + private final ActionListener listener; + private final ClusterService clusterService; + private final Client client; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final LongSupplier nowSupplier; + + EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener, + ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver, + LongSupplier nowSupplier) { + this.policyName = policyName; + this.policy = policy; + this.listener = listener; + this.clusterService = clusterService; + this.client = client; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.nowSupplier = nowSupplier; + } + + @Override + public void run() { + // Collect the source index information + logger.info("Policy [{}]: Running enrich policy", policyName); + final String sourceIndexPattern = policy.getIndexPattern(); + logger.debug("Policy [{}]: Checking source index [{}]", policyName, sourceIndexPattern); + GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndexPattern); + client.admin().indices().getIndex(getIndexRequest, new ActionListener() { + @Override + public void onResponse(GetIndexResponse getIndexResponse) { + validateMappings(getIndexResponse); + prepareAndCreateEnrichIndex(); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private Map getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) { + ImmutableOpenMap> mappings = getIndexResponse.mappings(); + ImmutableOpenMap indexMapping = mappings.get(sourceIndexName); + assert indexMapping.keys().size() == 1 : "Expecting only one type per index"; + MappingMetaData typeMapping = indexMapping.iterator().next().value; + return typeMapping.sourceAsMap(); + } + + private void validateMappings(final GetIndexResponse getIndexResponse) { + String[] sourceIndices = getIndexResponse.getIndices(); + logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices); + for (String sourceIndex : sourceIndices) { + Map mapping = getMappings(getIndexResponse, sourceIndex); + Set properties = ((Map) mapping.get("properties")).keySet(); + if (properties == null) { + listener.onFailure( + new ElasticsearchException( + "Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]", + policyName, sourceIndex, policy.getIndexPattern())); + } + if (properties.contains(policy.getEnrichKey()) == false) { + listener.onFailure( + new ElasticsearchException( + "Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]", + policyName, policy.getEnrichKey(), sourceIndex)); + } + } + } + + private String getEnrichIndexBase(final String policyName) { + return ENRICH_INDEX_NAME_BASE + policyName; + } + + private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { + // Currently the only supported policy type is EnrichPolicy.EXACT_MATCH_TYPE, which is a keyword type + String keyType; + if (EnrichPolicy.EXACT_MATCH_TYPE.equals(policy.getType())) { + keyType = "keyword"; + } else { + throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); + } + + // Disable _source on enrich index. Explicitly mark key mapping type. + try { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject() + .startObject(MapperService.SINGLE_MAPPING_NAME) + .field("dynamic", false) + .startObject("_source") + .field("enabled", true) + .endObject() + .startObject("properties") + .startObject(policy.getEnrichKey()) + .field("type", keyType) + .field("doc_values", false) + .endObject() + .endObject() + .endObject() + .endObject(); + + return builder; + } catch (IOException ioe) { + throw new UncheckedIOException("Could not render enrich mapping", ioe); + } + } + + private void prepareAndCreateEnrichIndex() { + long nowTimestamp = nowSupplier.getAsLong(); + String enrichIndexName = getEnrichIndexBase(policyName) + "-" + nowTimestamp; + Settings enrichIndexSettings = Settings.builder() + .put("index.auto_expand_replicas", "0-all") + .build(); + CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings); + createEnrichIndexRequest.mapping(MapperService.SINGLE_MAPPING_NAME, resolveEnrichMapping(policy)); + logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName); + client.admin().indices().create(createEnrichIndexRequest, new ActionListener() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + transferDataToEnrichIndex(enrichIndexName); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private void transferDataToEnrichIndex(final String destinationIndexName) { + logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName); + // Filter down the source fields to just the ones required by the policy + final Set retainFields = new HashSet<>(); + retainFields.add(policy.getEnrichKey()); + retainFields.addAll(policy.getEnrichValues()); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.fetchSource(retainFields.toArray(new String[0]), new String[0]); + if (policy.getQuery() != null) { + searchSourceBuilder.query(QueryBuilders.wrapperQuery(policy.getQuery().getQuery())); + } + ReindexRequest reindexRequest = new ReindexRequest() + .setDestIndex(destinationIndexName) + .setSourceIndices(policy.getIndexPattern()); + reindexRequest.getSearchRequest().source(searchSourceBuilder); + reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE); + client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + // Do we want to fail the request if there were failures during the reindex process? + if (bulkByScrollResponse.getBulkFailures().size() > 0) { + listener.onFailure(new ElasticsearchException("Encountered bulk failures during reindex process")); + } else if (bulkByScrollResponse.getSearchFailures().size() > 0) { + listener.onFailure(new ElasticsearchException("Encountered search failures during reindex process")); + } else { + logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName, + bulkByScrollResponse.getCreated(), destinationIndexName); + refreshEnrichIndex(destinationIndexName); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private void refreshEnrichIndex(final String destinationIndexName) { + logger.debug("Policy [{}]: Refreshing newly created enrich index [{}]", policyName, destinationIndexName); + client.admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener() { + @Override + public void onResponse(RefreshResponse refreshResponse) { + updateEnrichPolicyAlias(destinationIndexName); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private void updateEnrichPolicyAlias(final String destinationIndexName) { + String enrichIndexBase = getEnrichIndexBase(policyName); + logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase); + GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase); + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest); + ImmutableOpenMap> aliases = + clusterService.state().metaData().findAliases(aliasRequest, concreteIndices); + IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest(); + String[] indices = aliases.keys().toArray(String.class); + if (indices.length > 0) { + aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().indices(indices).alias(enrichIndexBase)); + } + aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(destinationIndexName).alias(enrichIndexBase)); + client.admin().indices().aliases(aliasToggleRequest, new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + logger.info("Policy [{}]: Policy execution complete", policyName); + listener.onResponse(new PolicyExecutionResult(true)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java new file mode 100644 index 00000000000..faa48c87521 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +public class PolicyExecutionResult { + private final boolean completed; + + public PolicyExecutionResult(boolean completed) { + this.completed = completed; + } + + public boolean isCompleted() { + return completed; + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java new file mode 100644 index 00000000000..3d42239910a --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -0,0 +1,265 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; + +public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singletonList(ReindexPlugin.class); + } + + public void testRunner() throws Exception { + final String sourceIndex = "source-index"; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id("id") + .source( + "{" + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"," + + "\"field4\":\"ignored\"," + + "\"field5\":\"value5\"" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).get(); + + logger.info("Status: " + indexRequest.status().getStatus()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).get(); + + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + + logger.info("Created Doc"); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + + final long createTime = randomNonNegativeLong(); + + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + List enrichFields = new ArrayList<>(); + enrichFields.add("field2"); + enrichFields.add("field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, sourceIndex, "field1", enrichFields, ""); + String policyName = "test1"; + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(PolicyExecutionResult policyExecutionResult) { + logger.info("Run complete"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.warn("Run failed"); + exception.set(e); + latch.countDown(); + } + }; + + EnrichPolicyRunner enrichPolicyRunner = + new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime); + + logger.info("Starting policy run"); + + enrichPolicyRunner.run(); + + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices() + .getIndex(new GetIndexRequest().indices(".enrich-test1")).get(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("field1"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("keyword"))); + assertThat(field1.get("doc_values"), is(false)); + + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).get(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(3))); + assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); + assertThat(enrichDocument.get("field2"), is(equalTo(2))); + assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + } + + public void testRunnerMultiSource() throws Exception { + String baseSourceName = "source-index-"; + int numberOfSourceIndices = 3; + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + final String sourceIndex = baseSourceName + idx; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id(randomAlphaOfLength(10)) + .source( + "{" + + "\"idx\":" + idx + "," + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"," + + "\"field4\":\"ignored\"," + + "\"field5\":\"value5\"" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + + logger.info("Status: " + indexRequest.status().getStatus()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + + logger.info("Created Docs"); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + + final long createTime = randomNonNegativeLong(); + + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + String sourceIndexPattern = baseSourceName + "*"; + List enrichFields = new ArrayList<>(); + enrichFields.add("idx"); + enrichFields.add("field2"); + enrichFields.add("field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, sourceIndexPattern, "field1", enrichFields, ""); + String policyName = "test1"; + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(PolicyExecutionResult policyExecutionResult) { + logger.info("Run complete"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.warn("Run failed"); + exception.set(e); + latch.countDown(); + } + }; + + EnrichPolicyRunner enrichPolicyRunner = + new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime); + + logger.info("Starting policy run"); + + enrichPolicyRunner.run(); + + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("field1"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("keyword"))); + assertThat(field1.get("doc_values"), is(false)); + + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).get(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(4))); + assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); + assertThat(enrichDocument.get("field2"), is(equalTo(2))); + assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + } +}