Add pipeline to ensure unique Enrich index documents (#46348)
Adds a pipeline that removes ids and routing from documents before indexing them into enrich indices. Enrich documents may come from multiple indices, and thus have id collisions on them. This pipeline ensures that documents with colliding id fields do not clobber one another during the reindex operation while executing an enrich policy.
This commit is contained in:
parent
0e1b77568a
commit
a66c0dcd95
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.enrich;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.ingest.IngestMetadata;
|
||||||
|
import org.elasticsearch.ingest.PipelineConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages the definitions and lifecycle of the ingest pipeline used by the reindex operation within the Enrich Policy execution.
|
||||||
|
*/
|
||||||
|
public class EnrichPolicyReindexPipeline {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current version of the pipeline definition. Used in the pipeline's name to differentiate from breaking changes
|
||||||
|
* (separate from product version).
|
||||||
|
*/
|
||||||
|
static final String CURRENT_PIPELINE_VERSION_NAME = "7";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The last version of the distribution that updated the pipelines definition.
|
||||||
|
* TODO: This should be the version of ES that Enrich first ships in, which likely doesn't exist yet.
|
||||||
|
*/
|
||||||
|
static final int ENRICH_PIPELINE_LAST_UPDATED_VERSION = Version.V_7_4_0.id;
|
||||||
|
|
||||||
|
static String pipelineName() {
|
||||||
|
return "enrich-policy-reindex-" + CURRENT_PIPELINE_VERSION_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the current version of the pipeline definition is installed in the cluster
|
||||||
|
* @param clusterState The cluster state to check
|
||||||
|
* @return true if a pipeline exists that is compatible with this version of Enrich, false otherwise
|
||||||
|
*/
|
||||||
|
static boolean exists(ClusterState clusterState) {
|
||||||
|
final IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE);
|
||||||
|
// we ensure that we both have the pipeline and its version represents the current (or later) version
|
||||||
|
if (ingestMetadata != null) {
|
||||||
|
final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(pipelineName());
|
||||||
|
if (pipeline != null) {
|
||||||
|
Object version = pipeline.getConfigAsMap().get("version");
|
||||||
|
return version instanceof Number && ((Number) version).intValue() >= ENRICH_PIPELINE_LAST_UPDATED_VERSION;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a pipeline with the current version's pipeline definition
|
||||||
|
* @param client Client used to execute put pipeline
|
||||||
|
* @param listener Callback used after pipeline has been created
|
||||||
|
*/
|
||||||
|
public static void create(Client client, ActionListener<AcknowledgedResponse> listener) {
|
||||||
|
final BytesReference pipeline = BytesReference.bytes(currentEnrichPipelineDefinition(XContentType.JSON));
|
||||||
|
final PutPipelineRequest request = new PutPipelineRequest(pipelineName(), pipeline, XContentType.JSON);
|
||||||
|
client.admin().cluster().putPipeline(request, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static XContentBuilder currentEnrichPipelineDefinition(XContentType xContentType) {
|
||||||
|
try {
|
||||||
|
return XContentBuilder.builder(xContentType.xContent())
|
||||||
|
.startObject()
|
||||||
|
.field("description", "This pipeline sanitizes documents that will be stored in enrich indices for ingest lookup " +
|
||||||
|
"purposes. It is an internal pipeline and should not be modified.")
|
||||||
|
.field("version", ENRICH_PIPELINE_LAST_UPDATED_VERSION)
|
||||||
|
.startArray("processors")
|
||||||
|
.startObject()
|
||||||
|
// remove the id from the document so that documents from multiple indices will always be unique.
|
||||||
|
.startObject("remove")
|
||||||
|
.field("field", "_id")
|
||||||
|
.endObject()
|
||||||
|
.endObject()
|
||||||
|
.endArray()
|
||||||
|
.endObject();
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new UncheckedIOException("Failed to create pipeline for enrich document sanitization", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -250,7 +250,7 @@ public class EnrichPolicyRunner implements Runnable {
|
||||||
client.admin().indices().create(createEnrichIndexRequest, new ActionListener<CreateIndexResponse>() {
|
client.admin().indices().create(createEnrichIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(CreateIndexResponse createIndexResponse) {
|
public void onResponse(CreateIndexResponse createIndexResponse) {
|
||||||
transferDataToEnrichIndex(enrichIndexName);
|
prepareReindexOperation(enrichIndexName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -260,6 +260,25 @@ public class EnrichPolicyRunner implements Runnable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void prepareReindexOperation(final String destinationIndexName) {
|
||||||
|
// Check to make sure that the enrich pipeline exists, and create it if it is missing.
|
||||||
|
if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) {
|
||||||
|
EnrichPolicyReindexPipeline.create(client, new ActionListener<AcknowledgedResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
|
||||||
|
transferDataToEnrichIndex(destinationIndexName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
transferDataToEnrichIndex(destinationIndexName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void transferDataToEnrichIndex(final String destinationIndexName) {
|
private void transferDataToEnrichIndex(final String destinationIndexName) {
|
||||||
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
|
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
|
||||||
// Filter down the source fields to just the ones required by the policy
|
// Filter down the source fields to just the ones required by the policy
|
||||||
|
@ -277,6 +296,8 @@ public class EnrichPolicyRunner implements Runnable {
|
||||||
.setSourceIndices(policy.getIndices().toArray(new String[0]));
|
.setSourceIndices(policy.getIndices().toArray(new String[0]));
|
||||||
reindexRequest.getSearchRequest().source(searchSourceBuilder);
|
reindexRequest.getSearchRequest().source(searchSourceBuilder);
|
||||||
reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE);
|
reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE);
|
||||||
|
reindexRequest.getDestination().routing("discard");
|
||||||
|
reindexRequest.getDestination().setPipeline(EnrichPolicyReindexPipeline.pipelineName());
|
||||||
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<BulkByScrollResponse>() {
|
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<BulkByScrollResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
|
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
|
||||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||||
|
import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||||
|
@ -46,7 +47,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||||
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
|
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||||
|
import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
@ -59,7 +60,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
|
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.enrich;
|
package org.elasticsearch.xpack.enrich;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -42,6 +43,7 @@ import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||||
|
import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
@ -58,7 +60,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||||
return Collections.singletonList(ReindexPlugin.class);
|
return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRunner() throws Exception {
|
public void testRunner() throws Exception {
|
||||||
|
@ -165,6 +167,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
||||||
.source(
|
.source(
|
||||||
"{" +
|
"{" +
|
||||||
"\"idx\":" + idx + "," +
|
"\"idx\":" + idx + "," +
|
||||||
|
"\"key\":" + "\"key" + idx + "\"," +
|
||||||
"\"field1\":\"value1\"," +
|
"\"field1\":\"value1\"," +
|
||||||
"\"field2\":2," +
|
"\"field2\":2," +
|
||||||
"\"field3\":\"ignored\"," +
|
"\"field3\":\"ignored\"," +
|
||||||
|
@ -184,6 +187,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
||||||
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
|
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
|
||||||
assertNotNull(sourceDocMap);
|
assertNotNull(sourceDocMap);
|
||||||
assertThat(sourceDocMap.get("idx"), is(equalTo(idx)));
|
assertThat(sourceDocMap.get("idx"), is(equalTo(idx)));
|
||||||
|
assertThat(sourceDocMap.get("key"), is(equalTo("key" + idx)));
|
||||||
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
|
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
|
||||||
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
|
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
|
||||||
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
|
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
|
||||||
|
@ -194,10 +198,11 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
||||||
String sourceIndexPattern = baseSourceName + "*";
|
String sourceIndexPattern = baseSourceName + "*";
|
||||||
List<String> enrichFields = new ArrayList<>();
|
List<String> enrichFields = new ArrayList<>();
|
||||||
enrichFields.add("idx");
|
enrichFields.add("idx");
|
||||||
|
enrichFields.add("field1");
|
||||||
enrichFields.add("field2");
|
enrichFields.add("field2");
|
||||||
enrichFields.add("field5");
|
enrichFields.add("field5");
|
||||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern),
|
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern),
|
||||||
"field1", enrichFields);
|
"key", enrichFields);
|
||||||
String policyName = "test1";
|
String policyName = "test1";
|
||||||
|
|
||||||
final long createTime = randomNonNegativeLong();
|
final long createTime = randomNonNegativeLong();
|
||||||
|
@ -229,10 +234,10 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
||||||
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
|
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
|
||||||
assertNotNull(properties);
|
assertNotNull(properties);
|
||||||
assertThat(properties.size(), is(equalTo(1)));
|
assertThat(properties.size(), is(equalTo(1)));
|
||||||
Map<?, ?> field1 = (Map<?, ?>) properties.get("field1");
|
Map<?, ?> keyfield = (Map<?, ?>) properties.get("key");
|
||||||
assertNotNull(field1);
|
assertNotNull(keyfield);
|
||||||
assertThat(field1.get("type"), is(equalTo("keyword")));
|
assertThat(keyfield.get("type"), is(equalTo("keyword")));
|
||||||
assertThat(field1.get("doc_values"), is(false));
|
assertThat(keyfield.get("doc_values"), is(false));
|
||||||
|
|
||||||
// Validate document structure
|
// Validate document structure
|
||||||
SearchResponse enrichSearchResponse = client().search(
|
SearchResponse enrichSearchResponse = client().search(
|
||||||
|
@ -242,7 +247,223 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
||||||
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L));
|
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L));
|
||||||
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
|
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
|
||||||
assertNotNull(enrichDocument);
|
assertNotNull(enrichDocument);
|
||||||
assertThat(enrichDocument.size(), is(equalTo(4)));
|
assertThat(enrichDocument.size(), is(equalTo(5)));
|
||||||
|
assertThat(enrichDocument.get("key"), is(equalTo("key0")));
|
||||||
|
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
|
||||||
|
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
|
||||||
|
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
|
||||||
|
|
||||||
|
// Validate segments
|
||||||
|
validateSegments(createdEnrichIndex, 3);
|
||||||
|
|
||||||
|
// Validate Index is read only
|
||||||
|
ensureEnrichIndexIsReadOnly(createdEnrichIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRunnerMultiSourceDocIdCollisions() throws Exception {
|
||||||
|
String baseSourceName = "source-index-";
|
||||||
|
int numberOfSourceIndices = 3;
|
||||||
|
String collidingDocId = randomAlphaOfLength(10);
|
||||||
|
for (int idx = 0; idx < numberOfSourceIndices; idx++) {
|
||||||
|
final String sourceIndex = baseSourceName + idx;
|
||||||
|
IndexResponse indexRequest = client().index(new IndexRequest()
|
||||||
|
.index(sourceIndex)
|
||||||
|
.id(collidingDocId)
|
||||||
|
.routing(collidingDocId + idx)
|
||||||
|
.source(
|
||||||
|
"{" +
|
||||||
|
"\"idx\":" + idx + "," +
|
||||||
|
"\"key\":" + "\"key" + idx + "\"," +
|
||||||
|
"\"field1\":\"value1\"," +
|
||||||
|
"\"field2\":2," +
|
||||||
|
"\"field3\":\"ignored\"," +
|
||||||
|
"\"field4\":\"ignored\"," +
|
||||||
|
"\"field5\":\"value5\"" +
|
||||||
|
"}",
|
||||||
|
XContentType.JSON)
|
||||||
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||||
|
).actionGet();
|
||||||
|
assertEquals(RestStatus.CREATED, indexRequest.status());
|
||||||
|
|
||||||
|
SearchResponse sourceSearchResponse = client().search(
|
||||||
|
new SearchRequest(sourceIndex)
|
||||||
|
.source(SearchSourceBuilder.searchSource()
|
||||||
|
.query(QueryBuilders.matchAllQuery()))).actionGet();
|
||||||
|
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
|
||||||
|
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
|
||||||
|
assertNotNull(sourceDocMap);
|
||||||
|
assertThat(sourceDocMap.get("idx"), is(equalTo(idx)));
|
||||||
|
assertThat(sourceDocMap.get("key"), is(equalTo("key" + idx)));
|
||||||
|
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
|
||||||
|
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
|
||||||
|
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
|
||||||
|
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
|
||||||
|
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
|
||||||
|
|
||||||
|
SearchResponse routingSearchResponse = client().search(
|
||||||
|
new SearchRequest(sourceIndex)
|
||||||
|
.source(SearchSourceBuilder.searchSource()
|
||||||
|
.query(QueryBuilders.matchQuery("_routing", collidingDocId + idx)))).actionGet();
|
||||||
|
assertEquals(1L, routingSearchResponse.getHits().getTotalHits().value);
|
||||||
|
}
|
||||||
|
|
||||||
|
String sourceIndexPattern = baseSourceName + "*";
|
||||||
|
List<String> enrichFields = Arrays.asList("idx", "field1", "field2", "field5");
|
||||||
|
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), "key",
|
||||||
|
enrichFields);
|
||||||
|
String policyName = "test1";
|
||||||
|
|
||||||
|
final long createTime = randomNonNegativeLong();
|
||||||
|
final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
|
||||||
|
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
|
||||||
|
|
||||||
|
logger.info("Starting policy run");
|
||||||
|
enrichPolicyRunner.run();
|
||||||
|
latch.await();
|
||||||
|
if (exception.get() != null) {
|
||||||
|
throw exception.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate Index definition
|
||||||
|
String createdEnrichIndex = ".enrich-test1-" + createTime;
|
||||||
|
GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet();
|
||||||
|
assertThat(enrichIndex.getIndices().length, equalTo(1));
|
||||||
|
assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex));
|
||||||
|
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
|
||||||
|
assertNotNull(settings);
|
||||||
|
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
|
||||||
|
|
||||||
|
// Validate Mapping
|
||||||
|
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
|
||||||
|
assertThat(mapping.get("dynamic"), is("false"));
|
||||||
|
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
|
||||||
|
assertNotNull(properties);
|
||||||
|
assertThat(properties.size(), is(equalTo(1)));
|
||||||
|
Map<?, ?> keyfield = (Map<?, ?>) properties.get("key");
|
||||||
|
assertNotNull(keyfield);
|
||||||
|
assertThat(keyfield.get("type"), is(equalTo("keyword")));
|
||||||
|
assertThat(keyfield.get("doc_values"), is(false));
|
||||||
|
|
||||||
|
// Validate document structure
|
||||||
|
SearchResponse enrichSearchResponse = client().search(
|
||||||
|
new SearchRequest(".enrich-test1")
|
||||||
|
.source(SearchSourceBuilder.searchSource()
|
||||||
|
.query(QueryBuilders.matchAllQuery()))).actionGet();
|
||||||
|
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L));
|
||||||
|
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
|
||||||
|
assertNotNull(enrichDocument);
|
||||||
|
assertThat(enrichDocument.size(), is(equalTo(5)));
|
||||||
|
assertThat(enrichDocument.get("key"), is(equalTo("key0")));
|
||||||
|
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
|
||||||
|
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
|
||||||
|
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
|
||||||
|
|
||||||
|
// Validate removal of routing values
|
||||||
|
for (int idx = 0; idx < numberOfSourceIndices; idx++) {
|
||||||
|
SearchResponse routingSearchResponse = client().search(
|
||||||
|
new SearchRequest(".enrich-test1")
|
||||||
|
.source(SearchSourceBuilder.searchSource()
|
||||||
|
.query(QueryBuilders.matchQuery("_routing", collidingDocId + idx)))).actionGet();
|
||||||
|
assertEquals(0L, routingSearchResponse.getHits().getTotalHits().value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate segments
|
||||||
|
validateSegments(createdEnrichIndex, 3);
|
||||||
|
|
||||||
|
// Validate Index is read only
|
||||||
|
ensureEnrichIndexIsReadOnly(createdEnrichIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception {
|
||||||
|
String baseSourceName = "source-index-";
|
||||||
|
int numberOfSourceIndices = 3;
|
||||||
|
for (int idx = 0; idx < numberOfSourceIndices; idx++) {
|
||||||
|
final String sourceIndex = baseSourceName + idx;
|
||||||
|
IndexResponse indexRequest = client().index(new IndexRequest()
|
||||||
|
.index(sourceIndex)
|
||||||
|
.id(randomAlphaOfLength(10))
|
||||||
|
.source(
|
||||||
|
"{" +
|
||||||
|
"\"idx\":" + idx + "," +
|
||||||
|
"\"key\":" + "\"key\"," +
|
||||||
|
"\"field1\":\"value1\"," +
|
||||||
|
"\"field2\":2," +
|
||||||
|
"\"field3\":\"ignored\"," +
|
||||||
|
"\"field4\":\"ignored\"," +
|
||||||
|
"\"field5\":\"value5\"" +
|
||||||
|
"}",
|
||||||
|
XContentType.JSON)
|
||||||
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||||
|
).actionGet();
|
||||||
|
assertEquals(RestStatus.CREATED, indexRequest.status());
|
||||||
|
|
||||||
|
SearchResponse sourceSearchResponse = client().search(
|
||||||
|
new SearchRequest(sourceIndex)
|
||||||
|
.source(SearchSourceBuilder.searchSource()
|
||||||
|
.query(QueryBuilders.matchAllQuery()))).actionGet();
|
||||||
|
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
|
||||||
|
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
|
||||||
|
assertNotNull(sourceDocMap);
|
||||||
|
assertThat(sourceDocMap.get("idx"), is(equalTo(idx)));
|
||||||
|
assertThat(sourceDocMap.get("key"), is(equalTo("key")));
|
||||||
|
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
|
||||||
|
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
|
||||||
|
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
|
||||||
|
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
|
||||||
|
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
|
||||||
|
}
|
||||||
|
|
||||||
|
String sourceIndexPattern = baseSourceName + "*";
|
||||||
|
List<String> enrichFields = Arrays.asList("idx", "field1", "field2", "field5");
|
||||||
|
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), "key",
|
||||||
|
enrichFields);
|
||||||
|
String policyName = "test1";
|
||||||
|
|
||||||
|
final long createTime = randomNonNegativeLong();
|
||||||
|
final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
|
||||||
|
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
|
||||||
|
|
||||||
|
logger.info("Starting policy run");
|
||||||
|
enrichPolicyRunner.run();
|
||||||
|
latch.await();
|
||||||
|
if (exception.get() != null) {
|
||||||
|
throw exception.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate Index definition
|
||||||
|
String createdEnrichIndex = ".enrich-test1-" + createTime;
|
||||||
|
GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet();
|
||||||
|
assertThat(enrichIndex.getIndices().length, equalTo(1));
|
||||||
|
assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex));
|
||||||
|
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
|
||||||
|
assertNotNull(settings);
|
||||||
|
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
|
||||||
|
|
||||||
|
// Validate Mapping
|
||||||
|
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
|
||||||
|
assertThat(mapping.get("dynamic"), is("false"));
|
||||||
|
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
|
||||||
|
assertNotNull(properties);
|
||||||
|
assertThat(properties.size(), is(equalTo(1)));
|
||||||
|
Map<?, ?> keyfield = (Map<?, ?>) properties.get("key");
|
||||||
|
assertNotNull(keyfield);
|
||||||
|
assertThat(keyfield.get("type"), is(equalTo("keyword")));
|
||||||
|
assertThat(keyfield.get("doc_values"), is(false));
|
||||||
|
|
||||||
|
// Validate document structure
|
||||||
|
SearchResponse enrichSearchResponse = client().search(
|
||||||
|
new SearchRequest(".enrich-test1")
|
||||||
|
.source(SearchSourceBuilder.searchSource()
|
||||||
|
.query(QueryBuilders.matchAllQuery()))).actionGet();
|
||||||
|
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L));
|
||||||
|
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
|
||||||
|
assertNotNull(enrichDocument);
|
||||||
|
assertThat(enrichDocument.size(), is(equalTo(5)));
|
||||||
|
assertThat(enrichDocument.get("key"), is(equalTo("key")));
|
||||||
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
|
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
|
||||||
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
|
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
|
||||||
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
|
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
|
||||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||||
import org.elasticsearch.ingest.IngestService;
|
import org.elasticsearch.ingest.IngestService;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
|
import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||||
|
@ -30,7 +31,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||||
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
|
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdatePolicyOnly() {
|
public void testUpdatePolicyOnly() {
|
||||||
|
|
Loading…
Reference in New Issue