Make Filter actions extend HandledTransportAction (elastic/x-pack-elasticsearch#859)

Previously the GET/PUT/DELETE filters actions were master node actions. This is not necessary since the filters are stored in an index rather than the cluster state. This change makes the actions extend `HandledTransportAction` so they can be run on any node.

The change also makes PutFilterAction.TransportAction use the TransportBulkAction instead of the deprecated TransportIndexAction.

relates elastic/x-pack-elasticsearch#756

Original commit: elastic/x-pack-elasticsearch@c6df04382e
This commit is contained in:
Colin Goodheart-Smithe 2017-03-28 15:02:39 +01:00 committed by GitHub
parent b642ba6351
commit b5dadc733b
3 changed files with 43 additions and 83 deletions

View File

@ -14,14 +14,13 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -33,10 +32,12 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -143,34 +144,28 @@ public class DeleteFilterAction extends Action<DeleteFilterAction.Request, Delet
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final TransportBulkAction transportAction;
private final ClusterService clusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
JobManager jobManager, Client client, ClusterService clusterService,
TransportBulkAction transportAction) {
super(settings, DeleteFilterAction.NAME, transportService, clusterService, threadPool, actionFilters,
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.transportAction = transportAction;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void doExecute(Request request, ActionListener<Response> listener) {
final String filterId = request.getFilterId();
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE);
Map<String, Job> jobs = currentMlMetadata.getJobs();
List<String> currentlyUsedBy = new ArrayList<>();
@ -210,11 +205,6 @@ public class DeleteFilterAction extends Action<DeleteFilterAction.Request, Delet
}
});
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -16,15 +16,12 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
@ -43,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -219,34 +217,25 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final TransportGetAction transportGetAction;
private final TransportSearchAction transportSearchAction;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportGetAction transportGetAction, TransportSearchAction transportSearchAction) {
super(settings, GetFiltersAction.NAME, transportService, clusterService, threadPool, actionFilters,
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
JobManager jobManager, Client client, TransportGetAction transportGetAction,
TransportSearchAction transportSearchAction) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.transportGetAction = transportGetAction;
this.transportSearchAction = transportSearchAction;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void doExecute(Request request, ActionListener<Response> listener) {
final String filterId = request.getFilterId();
if (!Strings.isNullOrEmpty(filterId)) {
getFilter(filterId, listener);
@ -257,11 +246,6 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
}
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
private void getFilter(String filterId, ActionListener<Response> listener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId);
transportGetAction.execute(getRequest, new ActionListener<GetResponse>() {

View File

@ -13,18 +13,14 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -35,8 +31,10 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -151,38 +149,31 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
}
// extends TransportMasterNodeAction, because we will store in cluster state.
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final TransportBulkAction transportBulkAction;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction transportBulkAction) {
super(settings, PutFilterAction.NAME, transportService, clusterService, threadPool, actionFilters,
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
JobManager jobManager, Client client, TransportBulkAction transportBulkAction) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.transportBulkAction = transportBulkAction;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void doExecute(Request request, ActionListener<Response> listener) {
MlFilter filter = request.getFilter();
final String filterId = filter.getId();
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId);
XContentBuilder builder = XContentFactory.jsonBuilder();
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS));
} catch (IOException e) {
throw new IllegalStateException(
"Failed to serialise filter with id [" + filter.getId() + "]", e);
}
BulkRequest bulkRequest = new BulkRequest().add(indexRequest);
transportBulkAction.execute(bulkRequest, new ActionListener<BulkResponse>() {
@ -197,11 +188,6 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
}
});
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}