From 36f0e8a8bb13e2564149d5cfcfe58eef47a55472 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Jun 2019 13:21:47 +0200 Subject: [PATCH] Added multi node enrich tests and fixed serialization issues. (#43386) The test for now tests the enrich APIs in a multi node environment. Picked EsIntegTestCase test over a real qa module in order to avoid adding another module that starts a test cluster. --- .../enrich/action/ListEnrichPolicyAction.java | 7 + x-pack/plugin/enrich/build.gradle | 13 +- .../xpack/enrich/EnrichPlugin.java | 25 ++- .../TransportDeleteEnrichPolicyAction.java | 8 + .../TransportExecuteEnrichPolicyAction.java | 8 + .../TransportListEnrichPolicyAction.java | 7 + .../TransportPutEnrichPolicyAction.java | 9 +- .../xpack/enrich/EnrichMultiNodeIT.java | 194 ++++++++++++++++++ 8 files changed, 258 insertions(+), 13 deletions(-) create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ListEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ListEnrichPolicyAction.java index 0353c18d890..6362808e990 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ListEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ListEnrichPolicyAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -35,6 +36,12 @@ public class ListEnrichPolicyAction extends Action getResponseReader() { + return Response::new; + } + public static class Request extends MasterNodeRequest { public Request() {} diff --git a/x-pack/plugin/enrich/build.gradle b/x-pack/plugin/enrich/build.gradle index 84950b7f920..36b5fa18d2c 100644 --- a/x-pack/plugin/enrich/build.gradle +++ b/x-pack/plugin/enrich/build.gradle @@ -21,9 +21,20 @@ run { plugin xpackModule('core') } -// No tests yet: +// No real integ tests in the module: integTest.enabled = false +// Instead we create a separate task to run the tests based on ESIntegTestCase +task internalClusterTest(type: Test) { + description = '🌈🌈🌈🦄 Welcome to fantasy integration tests land! 🦄🌈🌈🌈' + mustRunAfter test + + include '**/*IT.class' + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + +check.dependsOn internalClusterTest + // add all sub-projects of the qa sub-project gradle.projectsEvaluated { project.subprojects diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 029a39356a0..65247d8ddd7 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -54,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; -import static java.util.Collections.emptyList; import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING; public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { @@ -74,13 +74,12 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { public Map getProcessors(Processor.Parameters parameters) { EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.localShardSearcher); parameters.ingestService.addIngestClusterStateListener(factory); - return Collections.singletonMap(EnrichProcessorFactory.TYPE, - factory); + return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory); } public List> getActions() { if (enabled == false) { - return emptyList(); + return Collections.emptyList(); } return Arrays.asList( @@ -97,7 +96,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { if (enabled == false) { - return emptyList(); + return Collections.emptyList(); } return Arrays.asList( @@ -116,22 +115,26 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool, new IndexNameExpressionResolver(), System::currentTimeMillis); - return Collections.singleton(enrichPolicyExecutor); + return Arrays.asList(enrichPolicyExecutor); } @Override public List getNamedWriteables() { - return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, - EnrichMetadata::new)); + return Arrays.asList( + new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, EnrichMetadata::new), + new NamedWriteableRegistry.Entry(NamedDiff.class, EnrichMetadata.TYPE, + in -> EnrichMetadata.readDiffFrom(MetaData.Custom.class, EnrichMetadata.TYPE, in)) + ); } public List getNamedXContent() { - return Collections.singletonList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE), - EnrichMetadata::fromXContent)); + return Arrays.asList( + new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE), EnrichMetadata::fromXContent) + ); } @Override public List> getSettings() { - return Collections.singletonList(ENRICH_FETCH_SIZE_SETTING); + return Arrays.asList(ENRICH_FETCH_SIZE_SETTING); } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java index dd7ebc6a497..aba92f8d004 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java @@ -15,11 +15,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichStore; +import java.io.IOException; + public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction { @Inject @@ -42,6 +45,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + @Override protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state, ActionListener listener) throws Exception { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index c8df791dca2..b84b393cb3b 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -15,12 +15,15 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; import org.elasticsearch.xpack.enrich.PolicyExecutionResult; +import java.io.IOException; + public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeReadAction { @@ -48,6 +51,11 @@ public class TransportExecuteEnrichPolicyAction throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + @Override protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state, ActionListener listener) { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportListEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportListEnrichPolicyAction.java index 3516e6167e5..9e87943496e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportListEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportListEnrichPolicyAction.java @@ -14,12 +14,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichStore; +import java.io.IOException; import java.util.Map; public class TransportListEnrichPolicyAction @@ -40,6 +42,11 @@ public class TransportListEnrichPolicyAction return ThreadPool.Names.SAME; } + @Override + protected ListEnrichPolicyAction.Response read(StreamInput in) throws IOException { + return new ListEnrichPolicyAction.Response(in); + } + @Override protected ListEnrichPolicyAction.Response newResponse() { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportPutEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportPutEnrichPolicyAction.java index c6d0c9ae5c3..9c0c428c32d 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportPutEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportPutEnrichPolicyAction.java @@ -15,11 +15,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichStore; +import java.io.IOException; + public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction { @Inject @@ -37,11 +40,15 @@ public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction listener) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java new file mode 100644 index 00000000000..46816c489f0 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class EnrichMultiNodeIT extends ESIntegTestCase { + + private static final String POLICY_NAME = "my-policy"; + private static final String PIPELINE_NAME = "my-pipeline"; + private static final String SOURCE_INDEX_NAME = "users"; + private static final String KEY_FIELD = "email"; + private static final String[] DECORATE_FIELDS = new String[]{"address", "city", "country"}; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(EnrichPlugin.class, ReindexPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + public void testEnrichAPIs() { + final int numPolicies = randomIntBetween(2, 4); + internalCluster().startNodes(randomIntBetween(2, 3)); + int numDocsInSourceIndex = randomIntBetween(8, 32); + createSourceIndex(numDocsInSourceIndex); + + for (int i = 0; i < numPolicies; i++) { + String policyName = POLICY_NAME + i; + EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, + Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS)); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + EnrichPolicy result = + client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request(policyName)).actionGet().getPolicy(); + assertThat(result, equalTo(enrichPolicy)); + String enrichIndexPrefix = EnrichPolicy.getBaseName(policyName) + "*"; + refresh(enrichIndexPrefix); + SearchResponse searchResponse = client().search(new SearchRequest(enrichIndexPrefix)).actionGet(); + assertThat(searchResponse.getHits().getTotalHits().relation, equalTo(TotalHits.Relation.EQUAL_TO)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numDocsInSourceIndex)); + } + + ListEnrichPolicyAction.Response response = + client().execute(ListEnrichPolicyAction.INSTANCE, new ListEnrichPolicyAction.Request()).actionGet(); + assertThat(response.getPolicies().size(), equalTo(numPolicies)); + + for (int i = 0; i < numPolicies; i++) { + String policyName = POLICY_NAME + i; + client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(policyName)).actionGet(); + } + + response = client().execute(ListEnrichPolicyAction.INSTANCE, new ListEnrichPolicyAction.Request()).actionGet(); + assertThat(response.getPolicies().size(), equalTo(0)); + } + + public void testEnrich() { + List nodes = internalCluster().startNodes(3); + List keys = createSourceIndex(64); + createAndExecutePolicy(); + createPipeline(); + enrich(keys, randomFrom(nodes)); + } + + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/43311") + public void testEnrichDedicatedIngestNode() { + internalCluster().startNode(); + Settings settings = Settings.builder() + .put(Node.NODE_MASTER_SETTING.getKey(), false) + .put(Node.NODE_DATA_SETTING.getKey(), false) + .put(Node.NODE_INGEST_SETTING.getKey(), true) + .build(); + String ingestOnlyNode = internalCluster().startNode(settings); + + List keys = createSourceIndex(64); + createAndExecutePolicy(); + createPipeline(); + enrich(keys, ingestOnlyNode); + } + + private static void enrich(List keys, String coordinatingNode) { + int numDocs = 256; + BulkRequest bulkRequest = new BulkRequest("my-index"); + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.id(Integer.toString(i)); + indexRequest.setPipeline(PIPELINE_NAME); + indexRequest.source(Collections.singletonMap(KEY_FIELD, randomFrom(keys))); + bulkRequest.add(indexRequest); + } + BulkResponse bulkResponse = client(coordinatingNode).bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + + for (int i = 0; i < numDocs; i++) { + GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); + Map source = getResponse.getSourceAsMap(); + assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length)); + for (String field : DECORATE_FIELDS) { + assertThat(source.get(field), notNullValue()); + } + } + } + + private static List createSourceIndex(int numDocs) { + Set keys = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String key; + do { + key = randomAlphaOfLength(16); + } while (keys.add(key) == false); + + IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); + indexRequest.create(true); + indexRequest.id(key); + indexRequest.source(mapOf(KEY_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4), + DECORATE_FIELDS[1], randomAlphaOfLength(4), DECORATE_FIELDS[2], randomAlphaOfLength(4))); + client().index(indexRequest).actionGet(); + } + client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); + return new ArrayList<>(keys); + } + + private static void createAndExecutePolicy() { + EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, + Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS)); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet(); + } + + private static void createPipeline() { + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + POLICY_NAME + + "\", \"enrich_values\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," + + "{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," + + "{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" + + "]}}]}"; + PutPipelineRequest request = new PutPipelineRequest(PIPELINE_NAME, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(request).actionGet(); + } + + private static Map mapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) { + Map map = new HashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + map.put(key3, value3); + map.put(key4, value4); + return map; + } +}