Backport 7.x - Add enrich policy runner (#41088) (#41759)

Backports #41088

Adds the foundation of the execution logic to execute an enrich policy. Validates
the source index existence as well as mappings, creates a new enrich index for
the policy, reindexes the source index into the new enrich index, and swaps the 
enrich alias for the policy to the new index.
This commit is contained in:
James Baiera 2019-05-06 10:32:46 -04:00 committed by GitHub
parent 5d53706310
commit c25736c410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 613 additions and 0 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -19,12 +20,17 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@ -36,6 +42,7 @@ import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -95,6 +102,16 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
); );
} }
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(clusterService, client, threadPool,
new IndexNameExpressionResolver(), System::currentTimeMillis);
return Collections.singleton(enrichPolicyExecutor);
}
@Override @Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() { public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE,

View File

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import java.util.function.LongSupplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
class EnrichPolicyExecutor {
private final ClusterService clusterService;
private final Client client;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
EnrichPolicyExecutor(ClusterService clusterService, Client client, ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) {
this.clusterService = clusterService;
this.client = client;
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
}
public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> listener) {
// Look up policy in policy store and execute it
EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state());
if (policy == null) {
throw new ElasticsearchException("Policy execution failed. Could not locate policy with id [{}]", policyId);
} else {
runPolicy(policyId, policy, listener);
}
}
public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
EnrichPolicyRunner runnable =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
}
}

View File

@ -0,0 +1,262 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
public class EnrichPolicyRunner implements Runnable {
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
private static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
private final String policyName;
private final EnrichPolicy policy;
private final ActionListener<PolicyExecutionResult> listener;
private final ClusterService clusterService;
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier) {
this.policyName = policyName;
this.policy = policy;
this.listener = listener;
this.clusterService = clusterService;
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
}
@Override
public void run() {
// Collect the source index information
logger.info("Policy [{}]: Running enrich policy", policyName);
final String sourceIndexPattern = policy.getIndexPattern();
logger.debug("Policy [{}]: Checking source index [{}]", policyName, sourceIndexPattern);
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndexPattern);
client.admin().indices().getIndex(getIndexRequest, new ActionListener<GetIndexResponse>() {
@Override
public void onResponse(GetIndexResponse getIndexResponse) {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex();
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private Map<String, Object> getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getIndexResponse.mappings();
ImmutableOpenMap<String, MappingMetaData> indexMapping = mappings.get(sourceIndexName);
assert indexMapping.keys().size() == 1 : "Expecting only one type per index";
MappingMetaData typeMapping = indexMapping.iterator().next().value;
return typeMapping.sourceAsMap();
}
private void validateMappings(final GetIndexResponse getIndexResponse) {
String[] sourceIndices = getIndexResponse.getIndices();
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
Set<?> properties = ((Map<?, ?>) mapping.get("properties")).keySet();
if (properties == null) {
listener.onFailure(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndexPattern()));
}
if (properties.contains(policy.getEnrichKey()) == false) {
listener.onFailure(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]",
policyName, policy.getEnrichKey(), sourceIndex));
}
}
}
private String getEnrichIndexBase(final String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName;
}
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.EXACT_MATCH_TYPE, which is a keyword type
String keyType;
if (EnrichPolicy.EXACT_MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword";
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
}
// Disable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.field("dynamic", false)
.startObject("_source")
.field("enabled", true)
.endObject()
.startObject("properties")
.startObject(policy.getEnrichKey())
.field("type", keyType)
.field("doc_values", false)
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
} catch (IOException ioe) {
throw new UncheckedIOException("Could not render enrich mapping", ioe);
}
}
private void prepareAndCreateEnrichIndex() {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = getEnrichIndexBase(policyName) + "-" + nowTimestamp;
Settings enrichIndexSettings = Settings.builder()
.put("index.auto_expand_replicas", "0-all")
.build();
CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings);
createEnrichIndexRequest.mapping(MapperService.SINGLE_MAPPING_NAME, resolveEnrichMapping(policy));
logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName);
client.admin().indices().create(createEnrichIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
transferDataToEnrichIndex(enrichIndexName);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void transferDataToEnrichIndex(final String destinationIndexName) {
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
// Filter down the source fields to just the ones required by the policy
final Set<String> retainFields = new HashSet<>();
retainFields.add(policy.getEnrichKey());
retainFields.addAll(policy.getEnrichValues());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(retainFields.toArray(new String[0]), new String[0]);
if (policy.getQuery() != null) {
searchSourceBuilder.query(QueryBuilders.wrapperQuery(policy.getQuery().getQuery()));
}
ReindexRequest reindexRequest = new ReindexRequest()
.setDestIndex(destinationIndexName)
.setSourceIndices(policy.getIndexPattern());
reindexRequest.getSearchRequest().source(searchSourceBuilder);
reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE);
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
// Do we want to fail the request if there were failures during the reindex process?
if (bulkByScrollResponse.getBulkFailures().size() > 0) {
listener.onFailure(new ElasticsearchException("Encountered bulk failures during reindex process"));
} else if (bulkByScrollResponse.getSearchFailures().size() > 0) {
listener.onFailure(new ElasticsearchException("Encountered search failures during reindex process"));
} else {
logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName,
bulkByScrollResponse.getCreated(), destinationIndexName);
refreshEnrichIndex(destinationIndexName);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void refreshEnrichIndex(final String destinationIndexName) {
logger.debug("Policy [{}]: Refreshing newly created enrich index [{}]", policyName, destinationIndexName);
client.admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
updateEnrichPolicyAlias(destinationIndexName);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void updateEnrichPolicyAlias(final String destinationIndexName) {
String enrichIndexBase = getEnrichIndexBase(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);
ImmutableOpenMap<String, List<AliasMetaData>> aliases =
clusterService.state().metaData().findAliases(aliasRequest, concreteIndices);
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();
String[] indices = aliases.keys().toArray(String.class);
if (indices.length > 0) {
aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().indices(indices).alias(enrichIndexBase));
}
aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(destinationIndexName).alias(enrichIndexBase));
client.admin().indices().aliases(aliasToggleRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info("Policy [{}]: Policy execution complete", policyName);
listener.onResponse(new PolicyExecutionResult(true));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
public class PolicyExecutionResult {
private final boolean completed;
public PolicyExecutionResult(boolean completed) {
this.completed = completed;
}
public boolean isCompleted() {
return completed;
}
}

View File

@ -0,0 +1,265 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(ReindexPlugin.class);
}
public void testRunner() throws Exception {
final String sourceIndex = "source-index";
IndexResponse indexRequest = client().index(new IndexRequest()
.index(sourceIndex)
.id("id")
.source(
"{" +
"\"field1\":\"value1\"," +
"\"field2\":2," +
"\"field3\":\"ignored\"," +
"\"field4\":\"ignored\"," +
"\"field5\":\"value5\"" +
"}",
XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
).get();
logger.info("Status: " + indexRequest.status().getStatus());
SearchResponse sourceSearchResponse = client().search(
new SearchRequest(sourceIndex)
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchAllQuery()))).get();
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
assertNotNull(sourceDocMap);
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
logger.info("Created Doc");
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class);
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
List<String> enrichFields = new ArrayList<>();
enrichFields.add("field2");
enrichFields.add("field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, sourceIndex, "field1", enrichFields, "");
String policyName = "test1";
ActionListener<PolicyExecutionResult> listener = new ActionListener<PolicyExecutionResult>() {
@Override
public void onResponse(PolicyExecutionResult policyExecutionResult) {
logger.info("Run complete");
latch.countDown();
}
@Override
public void onFailure(Exception e) {
logger.warn("Run failed");
exception.set(e);
latch.countDown();
}
};
EnrichPolicyRunner enrichPolicyRunner =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime);
logger.info("Starting policy run");
enrichPolicyRunner.run();
latch.await();
if (exception.get() != null) {
throw exception.get();
}
String createdEnrichIndex = ".enrich-test1-" + createTime;
GetIndexResponse enrichIndex = client().admin().indices()
.getIndex(new GetIndexRequest().indices(".enrich-test1")).get();
assertThat(enrichIndex.getIndices().length, equalTo(1));
assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex));
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
assertNotNull(settings);
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
assertThat(mapping.get("dynamic"), is("false"));
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
assertNotNull(properties);
assertThat(properties.size(), is(equalTo(1)));
Map<?, ?> field1 = (Map<?, ?>) properties.get("field1");
assertNotNull(field1);
assertThat(field1.get("type"), is(equalTo("keyword")));
assertThat(field1.get("doc_values"), is(false));
SearchResponse enrichSearchResponse = client().search(
new SearchRequest(".enrich-test1")
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchAllQuery()))).get();
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
assertNotNull(enrichDocument);
assertThat(enrichDocument.size(), is(equalTo(3)));
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
}
public void testRunnerMultiSource() throws Exception {
String baseSourceName = "source-index-";
int numberOfSourceIndices = 3;
for (int idx = 0; idx < numberOfSourceIndices; idx++) {
final String sourceIndex = baseSourceName + idx;
IndexResponse indexRequest = client().index(new IndexRequest()
.index(sourceIndex)
.id(randomAlphaOfLength(10))
.source(
"{" +
"\"idx\":" + idx + "," +
"\"field1\":\"value1\"," +
"\"field2\":2," +
"\"field3\":\"ignored\"," +
"\"field4\":\"ignored\"," +
"\"field5\":\"value5\"" +
"}",
XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
).actionGet();
logger.info("Status: " + indexRequest.status().getStatus());
SearchResponse sourceSearchResponse = client().search(
new SearchRequest(sourceIndex)
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchAllQuery()))).actionGet();
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
assertNotNull(sourceDocMap);
assertThat(sourceDocMap.get("idx"), is(equalTo(idx)));
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
}
logger.info("Created Docs");
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class);
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
String sourceIndexPattern = baseSourceName + "*";
List<String> enrichFields = new ArrayList<>();
enrichFields.add("idx");
enrichFields.add("field2");
enrichFields.add("field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, sourceIndexPattern, "field1", enrichFields, "");
String policyName = "test1";
ActionListener<PolicyExecutionResult> listener = new ActionListener<PolicyExecutionResult>() {
@Override
public void onResponse(PolicyExecutionResult policyExecutionResult) {
logger.info("Run complete");
latch.countDown();
}
@Override
public void onFailure(Exception e) {
logger.warn("Run failed");
exception.set(e);
latch.countDown();
}
};
EnrichPolicyRunner enrichPolicyRunner =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime);
logger.info("Starting policy run");
enrichPolicyRunner.run();
latch.await();
if (exception.get() != null) {
throw exception.get();
}
String createdEnrichIndex = ".enrich-test1-" + createTime;
GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet();
assertThat(enrichIndex.getIndices().length, equalTo(1));
assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex));
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
assertNotNull(settings);
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
assertThat(mapping.get("dynamic"), is("false"));
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
assertNotNull(properties);
assertThat(properties.size(), is(equalTo(1)));
Map<?, ?> field1 = (Map<?, ?>) properties.get("field1");
assertNotNull(field1);
assertThat(field1.get("type"), is(equalTo("keyword")));
assertThat(field1.get("doc_values"), is(false));
SearchResponse enrichSearchResponse = client().search(
new SearchRequest(".enrich-test1")
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchAllQuery()))).get();
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L));
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
assertNotNull(enrichDocument);
assertThat(enrichDocument.size(), is(equalTo(4)));
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
}
}