diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java new file mode 100644 index 00000000000..e25ed646e87 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.IngestMetadata; +import org.elasticsearch.ingest.PipelineConfiguration; + +/** + * Manages the definitions and lifecycle of the ingest pipeline used by the reindex operation within the Enrich Policy execution. + */ +public class EnrichPolicyReindexPipeline { + + /** + * The current version of the pipeline definition. Used in the pipeline's name to differentiate from breaking changes + * (separate from product version). + */ + static final String CURRENT_PIPELINE_VERSION_NAME = "7"; + + /** + * The last version of the distribution that updated the pipelines definition. + * TODO: This should be the version of ES that Enrich first ships in, which likely doesn't exist yet. + */ + static final int ENRICH_PIPELINE_LAST_UPDATED_VERSION = Version.V_7_4_0.id; + + static String pipelineName() { + return "enrich-policy-reindex-" + CURRENT_PIPELINE_VERSION_NAME; + } + + /** + * Checks if the current version of the pipeline definition is installed in the cluster + * @param clusterState The cluster state to check + * @return true if a pipeline exists that is compatible with this version of Enrich, false otherwise + */ + static boolean exists(ClusterState clusterState) { + final IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE); + // we ensure that we both have the pipeline and its version represents the current (or later) version + if (ingestMetadata != null) { + final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(pipelineName()); + if (pipeline != null) { + Object version = pipeline.getConfigAsMap().get("version"); + return version instanceof Number && ((Number) version).intValue() >= ENRICH_PIPELINE_LAST_UPDATED_VERSION; + } + } + return false; + } + + /** + * Creates a pipeline with the current version's pipeline definition + * @param client Client used to execute put pipeline + * @param listener Callback used after pipeline has been created + */ + public static void create(Client client, ActionListener listener) { + final BytesReference pipeline = BytesReference.bytes(currentEnrichPipelineDefinition(XContentType.JSON)); + final PutPipelineRequest request = new PutPipelineRequest(pipelineName(), pipeline, XContentType.JSON); + client.admin().cluster().putPipeline(request, listener); + } + + private static XContentBuilder currentEnrichPipelineDefinition(XContentType xContentType) { + try { + return XContentBuilder.builder(xContentType.xContent()) + .startObject() + .field("description", "This pipeline sanitizes documents that will be stored in enrich indices for ingest lookup " + + "purposes. It is an internal pipeline and should not be modified.") + .field("version", ENRICH_PIPELINE_LAST_UPDATED_VERSION) + .startArray("processors") + .startObject() + // remove the id from the document so that documents from multiple indices will always be unique. + .startObject("remove") + .field("field", "_id") + .endObject() + .endObject() + .endArray() + .endObject(); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to create pipeline for enrich document sanitization", e); + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 55100ae4058..7c776650754 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -250,7 +250,7 @@ public class EnrichPolicyRunner implements Runnable { client.admin().indices().create(createEnrichIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { - transferDataToEnrichIndex(enrichIndexName); + prepareReindexOperation(enrichIndexName); } @Override @@ -260,6 +260,25 @@ public class EnrichPolicyRunner implements Runnable { }); } + private void prepareReindexOperation(final String destinationIndexName) { + // Check to make sure that the enrich pipeline exists, and create it if it is missing. + if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) { + EnrichPolicyReindexPipeline.create(client, new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + transferDataToEnrichIndex(destinationIndexName); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } else { + transferDataToEnrichIndex(destinationIndexName); + } + } + private void transferDataToEnrichIndex(final String destinationIndexName) { logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName); // Filter down the source fields to just the ones required by the policy @@ -277,6 +296,8 @@ public class EnrichPolicyRunner implements Runnable { .setSourceIndices(policy.getIndices().toArray(new String[0])); reindexRequest.getSearchRequest().source(searchSourceBuilder); reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE); + reindexRequest.getDestination().routing("discard"); + reindexRequest.getDestination().setPipeline(EnrichPolicyReindexPipeline.pipelineName()); client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 373cfbfbe8a..f234cd09d6c 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -46,7 +47,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); + return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index 6b278abd1ca..1f128527b8b 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -59,7 +60,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); + return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index f512afd12d0..9548bc40b46 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.enrich; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -42,6 +43,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -58,7 +60,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Collections.singletonList(ReindexPlugin.class); + return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class); } public void testRunner() throws Exception { @@ -165,6 +167,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { .source( "{" + "\"idx\":" + idx + "," + + "\"key\":" + "\"key" + idx + "\"," + "\"field1\":\"value1\"," + "\"field2\":2," + "\"field3\":\"ignored\"," + @@ -184,6 +187,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); assertNotNull(sourceDocMap); assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("key"), is(equalTo("key" + idx))); assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); assertThat(sourceDocMap.get("field2"), is(equalTo(2))); assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); @@ -194,10 +198,11 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { String sourceIndexPattern = baseSourceName + "*"; List enrichFields = new ArrayList<>(); enrichFields.add("idx"); + enrichFields.add("field1"); enrichFields.add("field2"); enrichFields.add("field5"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), - "field1", enrichFields); + "key", enrichFields); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -229,10 +234,10 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { Map properties = (Map) mapping.get("properties"); assertNotNull(properties); assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); + Map keyfield = (Map) properties.get("key"); + assertNotNull(keyfield); + assertThat(keyfield.get("type"), is(equalTo("keyword"))); + assertThat(keyfield.get("doc_values"), is(false)); // Validate document structure SearchResponse enrichSearchResponse = client().search( @@ -242,7 +247,223 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); assertNotNull(enrichDocument); - assertThat(enrichDocument.size(), is(equalTo(4))); + assertThat(enrichDocument.size(), is(equalTo(5))); + assertThat(enrichDocument.get("key"), is(equalTo("key0"))); + assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); + assertThat(enrichDocument.get("field2"), is(equalTo(2))); + assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + + // Validate segments + validateSegments(createdEnrichIndex, 3); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerMultiSourceDocIdCollisions() throws Exception { + String baseSourceName = "source-index-"; + int numberOfSourceIndices = 3; + String collidingDocId = randomAlphaOfLength(10); + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + final String sourceIndex = baseSourceName + idx; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id(collidingDocId) + .routing(collidingDocId + idx) + .source( + "{" + + "\"idx\":" + idx + "," + + "\"key\":" + "\"key" + idx + "\"," + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"," + + "\"field4\":\"ignored\"," + + "\"field5\":\"value5\"" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("key"), is(equalTo("key" + idx))); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + + SearchResponse routingSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchQuery("_routing", collidingDocId + idx)))).actionGet(); + assertEquals(1L, routingSearchResponse.getHits().getTotalHits().value); + } + + String sourceIndexPattern = baseSourceName + "*"; + List enrichFields = Arrays.asList("idx", "field1", "field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), "key", + enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map keyfield = (Map) properties.get("key"); + assertNotNull(keyfield); + assertThat(keyfield.get("type"), is(equalTo("keyword"))); + assertThat(keyfield.get("doc_values"), is(false)); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(5))); + assertThat(enrichDocument.get("key"), is(equalTo("key0"))); + assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); + assertThat(enrichDocument.get("field2"), is(equalTo(2))); + assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + + // Validate removal of routing values + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + SearchResponse routingSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchQuery("_routing", collidingDocId + idx)))).actionGet(); + assertEquals(0L, routingSearchResponse.getHits().getTotalHits().value); + } + + // Validate segments + validateSegments(createdEnrichIndex, 3); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception { + String baseSourceName = "source-index-"; + int numberOfSourceIndices = 3; + for (int idx = 0; idx < numberOfSourceIndices; idx++) { + final String sourceIndex = baseSourceName + idx; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id(randomAlphaOfLength(10)) + .source( + "{" + + "\"idx\":" + idx + "," + + "\"key\":" + "\"key\"," + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"," + + "\"field4\":\"ignored\"," + + "\"field5\":\"value5\"" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("idx"), is(equalTo(idx))); + assertThat(sourceDocMap.get("key"), is(equalTo("key"))); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + + String sourceIndexPattern = baseSourceName + "*"; + List enrichFields = Arrays.asList("idx", "field1", "field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern), "key", + enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map keyfield = (Map) properties.get("key"); + assertNotNull(keyfield); + assertThat(keyfield.get("type"), is(equalTo("keyword"))); + assertThat(keyfield.get("doc_values"), is(false)); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(3L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(5))); + assertThat(enrichDocument.get("key"), is(equalTo("key"))); assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); assertThat(enrichDocument.get("field2"), is(equalTo(2))); assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 5aa1d4484ed..0b67802d6e9 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -30,7 +31,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); + return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); } public void testUpdatePolicyOnly() {