From a66c0dcd957c9b0a132e12425cf37ad88326203d Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 24 Sep 2019 15:16:40 -0400 Subject: [PATCH] Add pipeline to ensure unique Enrich index documents (#46348) Adds a pipeline that removes ids and routing from documents before indexing them into enrich indices. Enrich documents may come from multiple indices, and thus have id collisions on them. This pipeline ensures that documents with colliding id fields do not clobber one another during the reindex operation while executing an enrich policy. --- .../enrich/EnrichPolicyReindexPipeline.java | 94 +++++++ .../xpack/enrich/EnrichPolicyRunner.java | 23 +- .../xpack/enrich/BasicEnrichTests.java | 3 +- .../xpack/enrich/EnrichMultiNodeIT.java | 3 +- .../xpack/enrich/EnrichPolicyRunnerTests.java | 235 +++++++++++++++++- .../xpack/enrich/EnrichPolicyUpdateTests.java | 3 +- 6 files changed, 350 insertions(+), 11 deletions(-) create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java new file mode 100644 index 00000000000..e25ed646e87 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.IngestMetadata; +import org.elasticsearch.ingest.PipelineConfiguration; + +/** + * Manages the definitions and lifecycle of the ingest pipeline used by the reindex operation within the Enrich Policy execution. + */ +public class EnrichPolicyReindexPipeline { + + /** + * The current version of the pipeline definition. Used in the pipeline's name to differentiate from breaking changes + * (separate from product version). + */ + static final String CURRENT_PIPELINE_VERSION_NAME = "7"; + + /** + * The last version of the distribution that updated the pipelines definition. + * TODO: This should be the version of ES that Enrich first ships in, which likely doesn't exist yet. + */ + static final int ENRICH_PIPELINE_LAST_UPDATED_VERSION = Version.V_7_4_0.id; + + static String pipelineName() { + return "enrich-policy-reindex-" + CURRENT_PIPELINE_VERSION_NAME; + } + + /** + * Checks if the current version of the pipeline definition is installed in the cluster + * @param clusterState The cluster state to check + * @return true if a pipeline exists that is compatible with this version of Enrich, false otherwise + */ + static boolean exists(ClusterState clusterState) { + final IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE); + // we ensure that we both have the pipeline and its version represents the current (or later) version + if (ingestMetadata != null) { + final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(pipelineName()); + if (pipeline != null) { + Object version = pipeline.getConfigAsMap().get("version"); + return version instanceof Number && ((Number) version).intValue() >= ENRICH_PIPELINE_LAST_UPDATED_VERSION; + } + } + return false; + } + + /** + * Creates a pipeline with the current version's pipeline definition + * @param client Client used to execute put pipeline + * @param listener Callback used after pipeline has been created + */ + public static void create(Client client, ActionListener listener) { + final BytesReference pipeline = BytesReference.bytes(currentEnrichPipelineDefinition(XContentType.JSON)); + final PutPipelineRequest request = new PutPipelineRequest(pipelineName(), pipeline, XContentType.JSON); + client.admin().cluster().putPipeline(request, listener); + } + + private static XContentBuilder currentEnrichPipelineDefinition(XContentType xContentType) { + try { + return XContentBuilder.builder(xContentType.xContent()) + .startObject() + .field("description", "This pipeline sanitizes documents that will be stored in enrich indices for ingest lookup " + + "purposes. It is an internal pipeline and should not be modified.") + .field("version", ENRICH_PIPELINE_LAST_UPDATED_VERSION) + .startArray("processors") + .startObject() + // remove the id from the document so that documents from multiple indices will always be unique. + .startObject("remove") + .field("field", "_id") + .endObject() + .endObject() + .endArray() + .endObject(); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to create pipeline for enrich document sanitization", e); + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 55100ae4058..7c776650754 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -250,7 +250,7 @@ public class EnrichPolicyRunner implements Runnable { client.admin().indices().create(createEnrichIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { - transferDataToEnrichIndex(enrichIndexName); + prepareReindexOperation(enrichIndexName); } @Override @@ -260,6 +260,25 @@ public class EnrichPolicyRunner implements Runnable { }); } + private void prepareReindexOperation(final String destinationIndexName) { + // Check to make sure that the enrich pipeline exists, and create it if it is missing. + if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) { + EnrichPolicyReindexPipeline.create(client, new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + transferDataToEnrichIndex(destinationIndexName); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } else { + transferDataToEnrichIndex(destinationIndexName); + } + } + private void transferDataToEnrichIndex(final String destinationIndexName) { logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName); // Filter down the source fields to just the ones required by the policy @@ -277,6 +296,8 @@ public class EnrichPolicyRunner implements Runnable { .setSourceIndices(policy.getIndices().toArray(new String[0])); reindexRequest.getSearchRequest().source(searchSourceBuilder); reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE); + reindexRequest.getDestination().routing("discard"); + reindexRequest.getDestination().setPipeline(EnrichPolicyReindexPipeline.pipelineName()); client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 373cfbfbe8a..f234cd09d6c 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -46,7 +47,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); + return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index 6b278abd1ca..1f128527b8b 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -59,7 +60,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); + return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index f512afd12d0..9548bc40b46 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.enrich; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -42,6 +43,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -58,7 +60,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Collections.singletonList(ReindexPlugin.class); + return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class); } public void testRunner() throws Exception { @@ -165,6 +167,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { .source( "{" + "\"idx\":" + idx + "," + + "\"key\":" + "\"key" + idx + "\"," + "\"field1\":\"value1\"," + "\"field2\":2," + "\"field3\":\"ignored\"," + @@ -184,6 +187,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); assertNotNull(sourceDocMap); assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("key"), is(equalTo("key" + idx))); assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); assertThat(sourceDocMap.get("field2"), is(equalTo(2))); assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); @@ -194,10 +198,11 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { String sourceIndexPattern = baseSourceName + "*"; List enrichFields = new ArrayList<>(); enrichFields.add("idx"); + enrichFields.add("field1"); enrichFields.add("field2"); enrichFields.add("field5"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), - "field1", enrichFields); + "key", enrichFields); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -229,10 +234,10 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { Map properties = (Map) mapping.get("properties"); assertNotNull(properties); assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); + Map keyfield = (Map) properties.get("key"); + assertNotNull(keyfield); + assertThat(keyfield.get("type"), is(equalTo("keyword"))); + assertThat(keyfield.get("doc_values"), is(false)); // Validate document structure SearchResponse enrichSearchResponse = client().search( @@ -242,7 +247,223 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); assertNotNull(enrichDocument); - assertThat(enrichDocument.size(), is(equalTo(4))); + assertThat(enrichDocument.size(), is(equalTo(5))); + assertThat(enrichDocument.get("key"), is(equalTo("key0"))); + assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); + assertThat(enrichDocument.get("field2"), is(equalTo(2))); + assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + + // Validate segments + validateSegments(createdEnrichIndex, 3); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerMultiSourceDocIdCollisions() throws Exception { + String baseSourceName = "source-index-"; + int numberOfSourceIndices = 3; + String collidingDocId = randomAlphaOfLength(10); + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + final String sourceIndex = baseSourceName + idx; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id(collidingDocId) + .routing(collidingDocId + idx) + .source( + "{" + + "\"idx\":" + idx + "," + + "\"key\":" + "\"key" + idx + "\"," + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"," + + "\"field4\":\"ignored\"," + + "\"field5\":\"value5\"" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("key"), is(equalTo("key" + idx))); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + + SearchResponse routingSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchQuery("_routing", collidingDocId + idx)))).actionGet(); + assertEquals(1L, routingSearchResponse.getHits().getTotalHits().value); + } + + String sourceIndexPattern = baseSourceName + "*"; + List enrichFields = Arrays.asList("idx", "field1", "field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), "key", + enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map keyfield = (Map) properties.get("key"); + assertNotNull(keyfield); + assertThat(keyfield.get("type"), is(equalTo("keyword"))); + assertThat(keyfield.get("doc_values"), is(false)); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(5))); + assertThat(enrichDocument.get("key"), is(equalTo("key0"))); + assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); + assertThat(enrichDocument.get("field2"), is(equalTo(2))); + assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + + // Validate removal of routing values + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + SearchResponse routingSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchQuery("_routing", collidingDocId + idx)))).actionGet(); + assertEquals(0L, routingSearchResponse.getHits().getTotalHits().value); + } + + // Validate segments + validateSegments(createdEnrichIndex, 3); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception { + String baseSourceName = "source-index-"; + int numberOfSourceIndices = 3; + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + final String sourceIndex = baseSourceName + idx; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id(randomAlphaOfLength(10)) + .source( + "{" + + "\"idx\":" + idx + "," + + "\"key\":" + "\"key\"," + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"," + + "\"field4\":\"ignored\"," + + "\"field5\":\"value5\"" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("key"), is(equalTo("key"))); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + + String sourceIndexPattern = baseSourceName + "*"; + List enrichFields = Arrays.asList("idx", "field1", "field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), "key", + enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map keyfield = (Map) properties.get("key"); + assertNotNull(keyfield); + assertThat(keyfield.get("type"), is(equalTo("keyword"))); + assertThat(keyfield.get("doc_values"), is(false)); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(5))); + assertThat(enrichDocument.get("key"), is(equalTo("key"))); assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); assertThat(enrichDocument.get("field2"), is(equalTo(2))); assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 5aa1d4484ed..0b67802d6e9 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -30,7 +31,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); + return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); } public void testUpdatePolicyOnly() {