Added multi node enrich tests and fixed serialization issues. (#43386)

The test for now tests the enrich APIs in a multi node environment.
Picked EsIntegTestCase test over a real qa module in order to avoid
adding another module that starts a test cluster.
This commit is contained in:
Martijn van Groningen 2019-06-25 13:21:47 +02:00
parent d0634e444d
commit 36f0e8a8bb
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
8 changed files with 258 additions and 13 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@ -35,6 +36,12 @@ public class ListEnrichPolicyAction extends Action<ListEnrichPolicyAction.Respon
public Response newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends MasterNodeRequest<ListEnrichPolicyAction.Request> {
public Request() {}

View File

@ -21,9 +21,20 @@ run {
plugin xpackModule('core')
}
// No tests yet:
// No real integ tests in the module:
integTest.enabled = false
// Instead we create a separate task to run the tests based on ESIntegTestCase
task internalClusterTest(type: Test) {
description = '🌈🌈🌈🦄 Welcome to fantasy integration tests land! 🦄🌈🌈🌈'
mustRunAfter test
include '**/*IT.class'
systemProperty 'es.set.netty.runtime.available.processors', 'false'
}
check.dependsOn internalClusterTest
// add all sub-projects of the qa sub-project
gradle.projectsEvaluated {
project.subprojects

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -54,7 +55,6 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
@ -74,13 +74,12 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.localShardSearcher);
parameters.ingestService.addIngestClusterStateListener(factory);
return Collections.singletonMap(EnrichProcessorFactory.TYPE,
factory);
return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory);
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled == false) {
return emptyList();
return Collections.emptyList();
}
return Arrays.asList(
@ -97,7 +96,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
if (enabled == false) {
return emptyList();
return Collections.emptyList();
}
return Arrays.asList(
@ -116,22 +115,26 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool,
new IndexNameExpressionResolver(), System::currentTimeMillis);
return Collections.singleton(enrichPolicyExecutor);
return Arrays.asList(enrichPolicyExecutor);
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE,
EnrichMetadata::new));
return Arrays.asList(
new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, EnrichMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, EnrichMetadata.TYPE,
in -> EnrichMetadata.readDiffFrom(MetaData.Custom.class, EnrichMetadata.TYPE, in))
);
}
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Collections.singletonList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE),
EnrichMetadata::fromXContent));
return Arrays.asList(
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE), EnrichMetadata::fromXContent)
);
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ENRICH_FETCH_SIZE_SETTING);
return Arrays.asList(ENRICH_FETCH_SIZE_SETTING);
}
}

View File

@ -15,11 +15,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichStore;
import java.io.IOException;
public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction<DeleteEnrichPolicyAction.Request, AcknowledgedResponse> {
@Inject
@ -42,6 +45,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {

View File

@ -15,12 +15,15 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor;
import org.elasticsearch.xpack.enrich.PolicyExecutionResult;
import java.io.IOException;
public class TransportExecuteEnrichPolicyAction
extends TransportMasterNodeReadAction<ExecuteEnrichPolicyAction.Request, AcknowledgedResponse> {
@ -48,6 +51,11 @@ public class TransportExecuteEnrichPolicyAction
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

View File

@ -14,12 +14,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichStore;
import java.io.IOException;
import java.util.Map;
public class TransportListEnrichPolicyAction
@ -40,6 +42,11 @@ public class TransportListEnrichPolicyAction
return ThreadPool.Names.SAME;
}
@Override
protected ListEnrichPolicyAction.Response read(StreamInput in) throws IOException {
return new ListEnrichPolicyAction.Response(in);
}
@Override
protected ListEnrichPolicyAction.Response newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");

View File

@ -15,11 +15,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichStore;
import java.io.IOException;
public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<PutEnrichPolicyAction.Request, AcknowledgedResponse> {
@Inject
@ -37,11 +40,15 @@ public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<Pu
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(PutEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

View File

@ -0,0 +1,194 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class EnrichMultiNodeIT extends ESIntegTestCase {
private static final String POLICY_NAME = "my-policy";
private static final String PIPELINE_NAME = "my-pipeline";
private static final String SOURCE_INDEX_NAME = "users";
private static final String KEY_FIELD = "email";
private static final String[] DECORATE_FIELDS = new String[]{"address", "city", "country"};
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(EnrichPlugin.class, ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
public void testEnrichAPIs() {
final int numPolicies = randomIntBetween(2, 4);
internalCluster().startNodes(randomIntBetween(2, 3));
int numDocsInSourceIndex = randomIntBetween(8, 32);
createSourceIndex(numDocsInSourceIndex);
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
EnrichPolicy result =
client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request(policyName)).actionGet().getPolicy();
assertThat(result, equalTo(enrichPolicy));
String enrichIndexPrefix = EnrichPolicy.getBaseName(policyName) + "*";
refresh(enrichIndexPrefix);
SearchResponse searchResponse = client().search(new SearchRequest(enrichIndexPrefix)).actionGet();
assertThat(searchResponse.getHits().getTotalHits().relation, equalTo(TotalHits.Relation.EQUAL_TO));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numDocsInSourceIndex));
}
ListEnrichPolicyAction.Response response =
client().execute(ListEnrichPolicyAction.INSTANCE, new ListEnrichPolicyAction.Request()).actionGet();
assertThat(response.getPolicies().size(), equalTo(numPolicies));
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(policyName)).actionGet();
}
response = client().execute(ListEnrichPolicyAction.INSTANCE, new ListEnrichPolicyAction.Request()).actionGet();
assertThat(response.getPolicies().size(), equalTo(0));
}
public void testEnrich() {
List<String> nodes = internalCluster().startNodes(3);
List<String> keys = createSourceIndex(64);
createAndExecutePolicy();
createPipeline();
enrich(keys, randomFrom(nodes));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/43311")
public void testEnrichDedicatedIngestNode() {
internalCluster().startNode();
Settings settings = Settings.builder()
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_INGEST_SETTING.getKey(), true)
.build();
String ingestOnlyNode = internalCluster().startNode(settings);
List<String> keys = createSourceIndex(64);
createAndExecutePolicy();
createPipeline();
enrich(keys, ingestOnlyNode);
}
private static void enrich(List<String> keys, String coordinatingNode) {
int numDocs = 256;
BulkRequest bulkRequest = new BulkRequest("my-index");
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline(PIPELINE_NAME);
indexRequest.source(Collections.singletonMap(KEY_FIELD, randomFrom(keys)));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client(coordinatingNode).bulk(bulkRequest).actionGet();
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
for (int i = 0; i < numDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length));
for (String field : DECORATE_FIELDS) {
assertThat(source.get(field), notNullValue());
}
}
}
private static List<String> createSourceIndex(int numDocs) {
Set<String> keys = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String key;
do {
key = randomAlphaOfLength(16);
} while (keys.add(key) == false);
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.create(true);
indexRequest.id(key);
indexRequest.source(mapOf(KEY_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4),
DECORATE_FIELDS[1], randomAlphaOfLength(4), DECORATE_FIELDS[2], randomAlphaOfLength(4)));
client().index(indexRequest).actionGet();
}
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
return new ArrayList<>(keys);
}
private static void createAndExecutePolicy() {
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();
}
private static void createPipeline() {
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + POLICY_NAME +
"\", \"enrich_values\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" +
"]}}]}";
PutPipelineRequest request = new PutPipelineRequest(PIPELINE_NAME, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(request).actionGet();
}
private static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
map.put(key4, value4);
return map;
}
}