Add enrich transport client support (#46002)

This commit adds an enrich client, as well as a smoke test to validate
the client works.
This commit is contained in:
Michael Basnight 2019-08-29 09:10:07 -05:00 committed by GitHub
parent 1157224a6b
commit 51a703da29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 170 additions and 8 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.protocol.xpack.frozen.FreezeResponse;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.enrich.client.EnrichClient;
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
import org.elasticsearch.xpack.core.ilm.client.ILMClient;
import org.elasticsearch.xpack.core.ml.client.MachineLearningClient;
@ -43,6 +44,7 @@ public class XPackClient {
private final WatcherClient watcherClient;
private final MachineLearningClient machineLearning;
private final ILMClient ilmClient;
private final EnrichClient enrichClient;
public XPackClient(Client client) {
this.client = Objects.requireNonNull(client, "client");
@ -53,6 +55,7 @@ public class XPackClient {
this.watcherClient = new WatcherClient(client);
this.machineLearning = new MachineLearningClient(client);
this.ilmClient = new ILMClient(client);
this.enrichClient = new EnrichClient(client);
}
public Client es() {
@ -87,6 +90,10 @@ public class XPackClient {
return ilmClient;
}
public EnrichClient enrichClient() {
return enrichClient;
}
public XPackClient withHeaders(Map<String, String> headers) {
return new XPackClient(client.filterWithHeader(headers));
}

View File

@ -6,8 +6,8 @@
package org.elasticsearch.xpack.core;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
@ -57,6 +57,10 @@ import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
@ -427,7 +431,12 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
DeleteDataFrameTransformAction.INSTANCE,
GetDataFrameTransformsAction.INSTANCE,
GetDataFrameTransformsStatsAction.INSTANCE,
PreviewDataFrameTransformAction.INSTANCE
PreviewDataFrameTransformAction.INSTANCE,
// enrich
DeleteEnrichPolicyAction.INSTANCE,
ExecuteEnrichPolicyAction.INSTANCE,
GetEnrichPolicyAction.INSTANCE,
PutEnrichPolicyAction.INSTANCE
);
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.enrich.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import java.util.Objects;
public class EnrichClient {
private final ElasticsearchClient client;
public EnrichClient(ElasticsearchClient client) {
this.client = Objects.requireNonNull(client, "client");
}
public void deleteEnrichPolicy(
final DeleteEnrichPolicyAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
client.execute(DeleteEnrichPolicyAction.INSTANCE, request, listener);
}
public ActionFuture<AcknowledgedResponse> deleteEnrichPolicy(final DeleteEnrichPolicyAction.Request request) {
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(DeleteEnrichPolicyAction.INSTANCE, request, listener);
return listener;
}
public void executeEnrichPolicy(
final ExecuteEnrichPolicyAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener);
}
public ActionFuture<AcknowledgedResponse> executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) {
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener);
return listener;
}
public void getEnrichPolicy(
final GetEnrichPolicyAction.Request request,
final ActionListener<GetEnrichPolicyAction.Response> listener) {
client.execute(GetEnrichPolicyAction.INSTANCE, request, listener);
}
public ActionFuture<GetEnrichPolicyAction.Response> getEnrichPolicy(final GetEnrichPolicyAction.Request request) {
final PlainActionFuture<GetEnrichPolicyAction.Response> listener = PlainActionFuture.newFuture();
client.execute(GetEnrichPolicyAction.INSTANCE, request, listener);
return listener;
}
public void putEnrichPolicy(
final PutEnrichPolicyAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
client.execute(PutEnrichPolicyAction.INSTANCE, request, listener);
}
public ActionFuture<AcknowledgedResponse> putEnrichPolicy(final PutEnrichPolicyAction.Request request) {
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(PutEnrichPolicyAction.INSTANCE, request, listener);
return listener;
}
}

View File

@ -58,6 +58,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
@ -89,10 +90,12 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
private final Settings settings;
private final Boolean enabled;
private final boolean transportClientMode;
public EnrichPlugin(final Settings settings) {
this.settings = settings;
this.enabled = ENRICH_ENABLED_SETTING.get(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
}
@Override
@ -106,7 +109,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled == false) {
return Collections.emptyList();
return emptyList();
}
return Arrays.asList(
@ -124,7 +127,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
if (enabled == false) {
return Collections.emptyList();
return emptyList();
}
return Arrays.asList(
@ -140,6 +143,9 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
if (enabled == false || transportClientMode) {
return emptyList();
}
EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool,
new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis);

View File

@ -16,7 +16,7 @@ testingConventions {
naming.clear()
naming {
IT {
baseClass 'org.elasticsearch.xpack.ml.client.ESXPackSmokeClientTestCase'
baseClass 'org.elasticsearch.xpack.ESXPackSmokeClientTestCase'
}
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.client;
package org.elasticsearch.xpack;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.client;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.ESXPackSmokeClientTestCase;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.client.EnrichClient;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class EnrichTransportClientIT extends ESXPackSmokeClientTestCase {
private static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPolicy newInstance) {
assertThat(newInstance.getType(), equalTo(expectedInstance.getType()));
if (newInstance.getQuery() != null) {
// testFromXContent, always shuffles the xcontent and then byte wise the query is different, so we check the parsed version:
assertThat(newInstance.getQuery().getQueryAsMap(), equalTo(expectedInstance.getQuery().getQueryAsMap()));
} else {
assertThat(expectedInstance.getQuery(), nullValue());
}
assertThat(newInstance.getIndices(), equalTo(expectedInstance.getIndices()));
assertThat(newInstance.getMatchField(), equalTo(expectedInstance.getMatchField()));
assertThat(newInstance.getEnrichFields(), equalTo(expectedInstance.getEnrichFields()));
}
public void testEnrichCrud() throws IOException {
Client client = getClient();
XPackClient xPackClient = new XPackClient(client);
EnrichClient enrichClient = xPackClient.enrichClient();
EnrichPolicy policy = new EnrichPolicy("exact_match", null, Collections.emptyList(), "test", Collections.emptyList());
String policyName = "my-policy";
AcknowledgedResponse acknowledgedResponse = enrichClient.putEnrichPolicy(
new PutEnrichPolicyAction.Request(policyName,
policy)).actionGet();
assertTrue(acknowledgedResponse.isAcknowledged());
GetEnrichPolicyAction.Response getResponse = enrichClient.getEnrichPolicy(
new GetEnrichPolicyAction.Request(policyName)).actionGet();
assertThat(getResponse.getPolicies().size(), equalTo(1));
assertThat(policyName, equalTo(getResponse.getPolicies().get(0).getName()));
assertEqualPolicies(policy, getResponse.getPolicies().get(0).getPolicy());
acknowledgedResponse = enrichClient.deleteEnrichPolicy(new DeleteEnrichPolicyAction.Request(policyName)).actionGet();
assertTrue(acknowledgedResponse.isAcknowledged());
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ESXPackSmokeClientTestCase;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;