From b5dadc733bacf1023422226bee1a9d7e5c7ee17d Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Tue, 28 Mar 2017 15:02:39 +0100 Subject: [PATCH] Make Filter actions extend HandledTransportAction (elastic/x-pack-elasticsearch#859) Previously the GET/PUT/DELETE filters actions were master node actions. This is not necessary since the filters are stored in an index rather than the cluster state. This change makes the actions extend `HandledTransportAction` so they can be run on any node. The change also makes PutFilterAction.TransportAction use the TransportBulkAction instead of the deprecated TransportIndexAction. relates elastic/x-pack-elasticsearch#756 Original commit: elastic/x-pack-elasticsearch@c6df04382e736851269192e5b166fc359575b94e --- .../xpack/ml/action/DeleteFilterAction.java | 40 ++++++---------- .../xpack/ml/action/GetFiltersAction.java | 38 +++++---------- .../xpack/ml/action/PutFilterAction.java | 48 +++++++------------ 3 files changed, 43 insertions(+), 83 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java index db8cc2bc152..29aff1d5669 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java @@ -14,14 +14,13 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -33,10 +32,12 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -143,34 +144,28 @@ public class DeleteFilterAction extends Action { + public static class TransportAction extends HandledTransportAction { private final TransportBulkAction transportAction; + private final ClusterService clusterService; @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - TransportBulkAction transportAction) { - super(settings, DeleteFilterAction.NAME, transportService, clusterService, threadPool, actionFilters, + public TransportAction(Settings settings, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider, + JobManager jobManager, Client client, ClusterService clusterService, + TransportBulkAction transportAction) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + this.clusterService = clusterService; this.transportAction = transportAction; } @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + protected void doExecute(Request request, ActionListener listener) { final String filterId = request.getFilterId(); + ClusterState state = clusterService.state(); MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE); Map jobs = currentMlMetadata.getJobs(); List currentlyUsedBy = new ArrayList<>(); @@ -210,11 +205,6 @@ public class DeleteFilterAction extends Action { + public static class TransportAction extends HandledTransportAction { private final TransportGetAction transportGetAction; private final TransportSearchAction transportSearchAction; @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - TransportGetAction transportGetAction, TransportSearchAction transportSearchAction) { - super(settings, GetFiltersAction.NAME, transportService, clusterService, threadPool, actionFilters, + public TransportAction(Settings settings, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider, + JobManager jobManager, Client client, TransportGetAction transportGetAction, + TransportSearchAction transportSearchAction) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); this.transportGetAction = transportGetAction; this.transportSearchAction = transportSearchAction; } @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + protected void doExecute(Request request, ActionListener listener) { final String filterId = request.getFilterId(); if (!Strings.isNullOrEmpty(filterId)) { getFilter(filterId, listener); @@ -257,11 +246,6 @@ public class GetFiltersAction extends Action listener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); transportGetAction.execute(getRequest, new ActionListener() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java index 848e55fd85e..1bcc67b7512 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java @@ -13,18 +13,14 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeReadRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -35,8 +31,10 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -151,38 +149,31 @@ public class PutFilterAction extends Action { + public static class TransportAction extends HandledTransportAction { private final TransportBulkAction transportBulkAction; @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - TransportBulkAction transportBulkAction) { - super(settings, PutFilterAction.NAME, transportService, clusterService, threadPool, actionFilters, + public TransportAction(Settings settings, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider, + JobManager jobManager, Client client, TransportBulkAction transportBulkAction) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); this.transportBulkAction = transportBulkAction; } @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + protected void doExecute(Request request, ActionListener listener) { MlFilter filter = request.getFilter(); final String filterId = filter.getId(); IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); - XContentBuilder builder = XContentFactory.jsonBuilder(); - indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS)); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to serialise filter with id [" + filter.getId() + "]", e); + } BulkRequest bulkRequest = new BulkRequest().add(indexRequest); transportBulkAction.execute(bulkRequest, new ActionListener() { @@ -197,11 +188,6 @@ public class PutFilterAction extends Action