Add enrich policy execute API (#41762)
This commit wires up the Rest calls and Transport calls for execute enrich policy, as well as tests and rest spec additions.
This commit is contained in:
parent
79fa7d8098
commit
2325ffb757
|
@ -20,7 +20,7 @@ public class DeleteEnrichPolicyAction extends Action<AcknowledgedResponse> {
|
|||
public static final DeleteEnrichPolicyAction INSTANCE = new DeleteEnrichPolicyAction();
|
||||
public static final String NAME = "cluster:admin/xpack/enrich/delete";
|
||||
|
||||
protected DeleteEnrichPolicyAction() {
|
||||
private DeleteEnrichPolicyAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.enrich.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ExecuteEnrichPolicyAction extends Action<AcknowledgedResponse> {
|
||||
|
||||
public static final ExecuteEnrichPolicyAction INSTANCE = new ExecuteEnrichPolicyAction();
|
||||
public static final String NAME = "cluster:admin/xpack/enrich/execute";
|
||||
|
||||
private ExecuteEnrichPolicyAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
|
||||
public static class Request extends MasterNodeReadRequest<Request> {
|
||||
|
||||
private final String name;
|
||||
|
||||
public Request(String name) {
|
||||
this.name = Objects.requireNonNull(name, "name cannot be null");
|
||||
}
|
||||
|
||||
public Request(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
name = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(name);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return name.equals(request.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,6 +11,11 @@
|
|||
enrich_values: ["a", "b"]
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
enrich.execute_policy:
|
||||
name: policy-crud
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
enrich.list_policy: {}
|
||||
- length: { policies: 1 }
|
||||
|
|
|
@ -7,8 +7,8 @@ package org.elasticsearch.xpack.enrich;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -33,12 +33,15 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportListEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.rest.RestDeleteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.rest.RestExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
|
||||
|
||||
|
@ -87,7 +90,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
|||
return Arrays.asList(
|
||||
new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
|
||||
new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class),
|
||||
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class)
|
||||
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
|
||||
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -102,7 +106,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
|||
return Arrays.asList(
|
||||
new RestDeleteEnrichPolicyAction(settings, restController),
|
||||
new RestListEnrichPolicyAction(settings, restController),
|
||||
new RestPutEnrichPolicyAction(settings, restController)
|
||||
new RestPutEnrichPolicyAction(settings, restController),
|
||||
new RestExecuteEnrichPolicyAction(settings, restController)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
|
||||
class EnrichPolicyExecutor {
|
||||
public class EnrichPolicyExecutor {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final Client client;
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich.action;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor;
|
||||
import org.elasticsearch.xpack.enrich.PolicyExecutionResult;
|
||||
|
||||
public class TransportExecuteEnrichPolicyAction
|
||||
extends TransportMasterNodeReadAction<ExecuteEnrichPolicyAction.Request, AcknowledgedResponse> {
|
||||
|
||||
private final EnrichPolicyExecutor executor;
|
||||
|
||||
@Inject
|
||||
public TransportExecuteEnrichPolicyAction(TransportService transportService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
EnrichPolicyExecutor executor) {
|
||||
super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse() {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) {
|
||||
executor.runPolicy(request.getName(), new ActionListener<PolicyExecutionResult>() {
|
||||
@Override
|
||||
public void onResponse(PolicyExecutionResult policyExecutionResult) {
|
||||
listener.onResponse(new AcknowledgedResponse(policyExecutionResult.isCompleted()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(ExecuteEnrichPolicyAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich.rest;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestExecuteEnrichPolicyAction extends BaseRestHandler {
|
||||
|
||||
public RestExecuteEnrichPolicyAction(final Settings settings, final RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.PUT, "/_enrich/policy/{name}/_execute", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_enrich/policy/{name}/_execute", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "execute_enrich_policy";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
|
||||
final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
|
||||
return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich.action;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
|
||||
public class ExecuteEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase<ExecuteEnrichPolicyAction.Request> {
|
||||
|
||||
@Override
|
||||
protected ExecuteEnrichPolicyAction.Request createTestInstance() {
|
||||
return new ExecuteEnrichPolicyAction.Request(randomAlphaOfLength(3));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<ExecuteEnrichPolicyAction.Request> instanceReader() {
|
||||
return ExecuteEnrichPolicyAction.Request::new;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"enrich.execute_policy": {
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-execute-policy.html",
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/_enrich/policy/{name}/_execute",
|
||||
"paths": ["/_enrich/policy/{name}/_execute"],
|
||||
"parts": {
|
||||
"name": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the enrich policy"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue