[ML] Add machine learning privileges/roles (elastic/x-pack-elasticsearch#673)
* Changed ML action names to allow distinguishing of admin and read-only actions using wildcards * Added manage_ml and monitor_ml built-in privileges as subsets of the existing manage and monitor privileges * Added out-of-the-box machine_learning_admin and machine_learning_user roles * Changed machine learning results endpoints to use a NodeClient rather than an InternalClient when searching for results so that index/document level permissions applied to ML results are respected Original commit: elastic/x-pack-elasticsearch@eee800aaa8
This commit is contained in:
parent
8df7a82435
commit
0b7c735aec
|
@ -264,7 +264,7 @@ public class MachineLearning implements ActionPlugin {
|
|||
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, internalClient);
|
||||
|
||||
Auditor auditor = new Auditor(internalClient, clusterService);
|
||||
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor);
|
||||
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, internalClient);
|
||||
AutodetectProcessFactory autodetectProcessFactory;
|
||||
NormalizerProcessFactory normalizerProcessFactory;
|
||||
if (AUTODETECT_PROCESS.get(settings)) {
|
||||
|
|
|
@ -13,7 +13,6 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
|||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
|
@ -145,16 +144,14 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
|
|||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
private final JobManager jobManager;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager,
|
||||
Client client) {
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobManager jobManager) {
|
||||
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
this.jobManager = jobManager;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -169,7 +166,7 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
|
|||
|
||||
@Override
|
||||
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
jobManager.deleteJob(request, client, (JobStorageDeletionTask) task, listener);
|
||||
jobManager.deleteJob(request, (JobStorageDeletionTask) task, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -44,7 +45,7 @@ import java.util.Objects;
|
|||
public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucketsAction.Response, GetBucketsAction.RequestBuilder> {
|
||||
|
||||
public static final GetBucketsAction INSTANCE = new GetBucketsAction();
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/results/buckets/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/results/buckets/get";
|
||||
|
||||
private GetBucketsAction() {
|
||||
super(NAME);
|
||||
|
@ -387,14 +388,16 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
|
|||
|
||||
private final JobProvider jobProvider;
|
||||
private final JobManager jobManager;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider, JobManager jobManager) {
|
||||
JobProvider jobProvider, JobManager jobManager, Client client) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.jobManager = jobManager;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -420,7 +423,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
|
|||
query.start(request.start);
|
||||
query.end(request.end);
|
||||
}
|
||||
jobProvider.buckets(request.jobId, query.build(), q -> listener.onResponse(new Response(q)), listener::onFailure);
|
||||
jobProvider.buckets(request.jobId, query.build(), q -> listener.onResponse(new Response(q)), listener::onFailure, client);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -43,7 +44,7 @@ public class GetCategoriesAction extends
|
|||
Action<GetCategoriesAction.Request, GetCategoriesAction.Response, GetCategoriesAction.RequestBuilder> {
|
||||
|
||||
public static final GetCategoriesAction INSTANCE = new GetCategoriesAction();
|
||||
private static final String NAME = "cluster:admin/ml/anomaly_detectors/results/categories/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/results/categories/get";
|
||||
|
||||
private GetCategoriesAction() {
|
||||
super(NAME);
|
||||
|
@ -236,12 +237,14 @@ Action<GetCategoriesAction.Request, GetCategoriesAction.Response, GetCategoriesA
|
|||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final JobProvider jobProvider;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider, Client client) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,7 +252,7 @@ Action<GetCategoriesAction.Request, GetCategoriesAction.Response, GetCategoriesA
|
|||
Integer from = request.pageParams != null ? request.pageParams.getFrom() : null;
|
||||
Integer size = request.pageParams != null ? request.pageParams.getSize() : null;
|
||||
jobProvider.categoryDefinitions(request.jobId, request.categoryId, from, size,
|
||||
r -> listener.onResponse(new Response(r)), listener::onFailure);
|
||||
r -> listener.onResponse(new Response(r)), listener::onFailure, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
|
|||
GetDatafeedsAction.RequestBuilder> {
|
||||
|
||||
public static final GetDatafeedsAction INSTANCE = new GetDatafeedsAction();
|
||||
public static final String NAME = "cluster:admin/ml/datafeeds/get";
|
||||
public static final String NAME = "cluster:monitor/ml/datafeeds/get";
|
||||
|
||||
public static final String ALL = "_all";
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
|
|||
GetDatafeedsStatsAction.RequestBuilder> {
|
||||
|
||||
public static final GetDatafeedsStatsAction INSTANCE = new GetDatafeedsStatsAction();
|
||||
public static final String NAME = "cluster:admin/ml/datafeeds/stats/get";
|
||||
public static final String NAME = "cluster:monitor/ml/datafeeds/stats/get";
|
||||
|
||||
public static final String ALL = "_all";
|
||||
private static final String STATE = "state";
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -43,7 +44,7 @@ public class GetInfluencersAction
|
|||
extends Action<GetInfluencersAction.Request, GetInfluencersAction.Response, GetInfluencersAction.RequestBuilder> {
|
||||
|
||||
public static final GetInfluencersAction INSTANCE = new GetInfluencersAction();
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/results/influencers/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/results/influencers/get";
|
||||
|
||||
private GetInfluencersAction() {
|
||||
super(NAME);
|
||||
|
@ -300,12 +301,14 @@ extends Action<GetInfluencersAction.Request, GetInfluencersAction.Response, GetI
|
|||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final JobProvider jobProvider;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider, Client client) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -313,7 +316,7 @@ extends Action<GetInfluencersAction.Request, GetInfluencersAction.Response, GetI
|
|||
InfluencersQueryBuilder.InfluencersQuery query = new InfluencersQueryBuilder().includeInterim(request.includeInterim)
|
||||
.start(request.start).end(request.end).from(request.pageParams.getFrom()).size(request.pageParams.getSize())
|
||||
.anomalyScoreThreshold(request.anomalyScoreFilter).sortField(request.sort).sortDescending(request.decending).build();
|
||||
jobProvider.influencers(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure);
|
||||
jobProvider.influencers(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure, client);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import java.util.Objects;
|
|||
public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.Response, GetJobsAction.RequestBuilder> {
|
||||
|
||||
public static final GetJobsAction INSTANCE = new GetJobsAction();
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/get";
|
||||
|
||||
private GetJobsAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -64,7 +64,7 @@ import java.util.stream.Collectors;
|
|||
public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJobsStatsAction.Response, GetJobsStatsAction.RequestBuilder> {
|
||||
|
||||
public static final GetJobsStatsAction INSTANCE = new GetJobsStatsAction();
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/stats/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/stats/get";
|
||||
|
||||
private static final String DATA_COUNTS = "data_counts";
|
||||
private static final String MODEL_SIZE_STATS = "model_size_stats";
|
||||
|
|
|
@ -43,7 +43,7 @@ public class GetModelSnapshotsAction
|
|||
extends Action<GetModelSnapshotsAction.Request, GetModelSnapshotsAction.Response, GetModelSnapshotsAction.RequestBuilder> {
|
||||
|
||||
public static final GetModelSnapshotsAction INSTANCE = new GetModelSnapshotsAction();
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/model_snapshots/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/model_snapshots/get";
|
||||
|
||||
private GetModelSnapshotsAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -44,7 +45,7 @@ import java.util.Objects;
|
|||
public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecordsAction.Response, GetRecordsAction.RequestBuilder> {
|
||||
|
||||
public static final GetRecordsAction INSTANCE = new GetRecordsAction();
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/results/records/get";
|
||||
public static final String NAME = "cluster:monitor/ml/anomaly_detectors/results/records/get";
|
||||
|
||||
private GetRecordsAction() {
|
||||
super(NAME);
|
||||
|
@ -338,14 +339,16 @@ public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecord
|
|||
|
||||
private final JobProvider jobProvider;
|
||||
private final JobManager jobManager;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider, JobManager jobManager) {
|
||||
JobProvider jobProvider, JobManager jobManager, Client client) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.jobManager = jobManager;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -363,7 +366,7 @@ public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecord
|
|||
.sortField(request.sort)
|
||||
.sortDescending(request.decending)
|
||||
.build();
|
||||
jobProvider.records(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure);
|
||||
jobProvider.records(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure, client);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,10 @@ public class MlDeleteByQueryAction extends Action<DeleteByQueryRequest, BulkBySc
|
|||
MlDeleteByQueryAction.MlDeleteByQueryRequestBuilder> {
|
||||
|
||||
public static final MlDeleteByQueryAction INSTANCE = new MlDeleteByQueryAction();
|
||||
public static final String NAME = "indices:data/write/delete/mlbyquery";
|
||||
// TODO: Ideally we'd use an "internal" action here as we don't want transport client users running it, but unfortunately the internal
|
||||
// _xpack user is forbidden to run "internal" actions. Putting "internal" at the top level of the action name at least restricts it to
|
||||
// superusers, and makes clear to anyone who sees the name that it's not for general use.
|
||||
public static final String NAME = "indices:internal/data/write/mldeletebyquery";
|
||||
|
||||
private MlDeleteByQueryAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -44,7 +44,7 @@ import java.util.concurrent.Semaphore;
|
|||
|
||||
public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobAction.Response, UpdateJobAction.RequestBuilder> {
|
||||
public static final UpdateJobAction INSTANCE = new UpdateJobAction();
|
||||
public static final String NAME = "cluster:admin/ml/job/update";
|
||||
public static final String NAME = "cluster:admin/ml/anomaly_detectors/update";
|
||||
|
||||
private UpdateJobAction() {
|
||||
super(NAME);
|
||||
|
@ -226,4 +226,4 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
|
|||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class UpdateProcessAction extends
|
|||
Action<UpdateProcessAction.Request, UpdateProcessAction.Response, UpdateProcessAction.RequestBuilder> {
|
||||
|
||||
public static final UpdateProcessAction INSTANCE = new UpdateProcessAction();
|
||||
public static final String NAME = "cluster:admin/ml/job/update/process";
|
||||
public static final String NAME = "internal:admin/ml/anomaly_detectors/update/process";
|
||||
|
||||
private UpdateProcessAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -229,7 +229,7 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
.sortDescending(true).size(1)
|
||||
.includeInterim(false)
|
||||
.build();
|
||||
jobProvider.buckets(jobId, latestBucketQuery, buckets -> {
|
||||
jobProvider.bucketsViaInternalClient(jobId, latestBucketQuery, buckets -> {
|
||||
jobProvider.dataCounts(jobId, dataCounts -> handler.accept(buckets, dataCounts), errorHandler);
|
||||
}, e -> {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
|
|
|
@ -59,17 +59,19 @@ public class JobManager extends AbstractComponent {
|
|||
private final ClusterService clusterService;
|
||||
private final JobResultsPersister jobResultsPersister;
|
||||
private final Auditor auditor;
|
||||
private final Client client;
|
||||
|
||||
/**
|
||||
* Create a JobManager
|
||||
*/
|
||||
public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
|
||||
ClusterService clusterService, Auditor auditor) {
|
||||
ClusterService clusterService, Auditor auditor, Client client) {
|
||||
super(settings);
|
||||
this.jobProvider = Objects.requireNonNull(jobProvider);
|
||||
this.clusterService = clusterService;
|
||||
this.jobResultsPersister = jobResultsPersister;
|
||||
this.auditor = auditor;
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.client = Objects.requireNonNull(client);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -248,8 +250,7 @@ public class JobManager extends AbstractComponent {
|
|||
return buildNewClusterState(currentState, builder);
|
||||
}
|
||||
|
||||
|
||||
public void deleteJob(DeleteJobAction.Request request, Client client, JobStorageDeletionTask task,
|
||||
public void deleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task,
|
||||
ActionListener<DeleteJobAction.Response> actionListener) {
|
||||
|
||||
String jobId = request.getJobId();
|
||||
|
@ -296,7 +297,7 @@ public class JobManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
// This task manages the physical deletion of the job (removing the results, then the index)
|
||||
task.delete(jobId, client, clusterService.state(),
|
||||
task.delete(jobId, client, clusterService.state(),
|
||||
deleteJobStateHandler::accept, actionListener::onFailure);
|
||||
};
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -43,6 +44,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
|
|||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermsQueryBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
@ -50,6 +52,10 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
|
|||
import org.elasticsearch.search.sort.SortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetInfluencersAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.MlFilter;
|
||||
|
@ -68,6 +74,7 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
|
|||
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
|
||||
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.security.support.Exceptions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -302,9 +309,21 @@ public class JobProvider {
|
|||
|
||||
/**
|
||||
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
|
||||
* Uses the internal client, so runs as the _xpack user
|
||||
*/
|
||||
public void buckets(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler)
|
||||
public void bucketsViaInternalClient(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler,
|
||||
Consumer<Exception> errorHandler)
|
||||
throws ResourceNotFoundException {
|
||||
buckets(jobId, query, handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
|
||||
* Uses a supplied client, so may run as the currently authenticated user
|
||||
*/
|
||||
public void buckets(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler,
|
||||
Client client) throws ResourceNotFoundException {
|
||||
|
||||
ResultsFilterBuilder rfb = new ResultsFilterBuilder();
|
||||
if (query.getTimestamp() != null) {
|
||||
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getTimestamp());
|
||||
|
@ -342,7 +361,7 @@ public class JobProvider {
|
|||
client.multiSearch(mrequest, ActionListener.wrap(mresponse -> {
|
||||
MultiSearchResponse.Item item1 = mresponse.getResponses()[0];
|
||||
if (item1.isFailure()) {
|
||||
errorHandler.accept(item1.getFailure());
|
||||
errorHandler.accept(mapAuthFailure(item1.getFailure(), jobId, GetBucketsAction.NAME));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -352,7 +371,7 @@ public class JobProvider {
|
|||
if (hits.getTotalHits() == 0) {
|
||||
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
|
||||
} else if (hits.getTotalHits() > 1) {
|
||||
LOGGER.error("Found more than one bucket with timestamp [{}]" + " from index {}", query.getTimestamp(), indexName);
|
||||
LOGGER.error("Found more than one bucket with timestamp [{}] from index {}", query.getTimestamp(), indexName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -387,14 +406,14 @@ public class JobProvider {
|
|||
if (query.isExpand()) {
|
||||
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
|
||||
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
|
||||
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler);
|
||||
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler, client);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (query.isExpand()) {
|
||||
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
|
||||
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
|
||||
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler);
|
||||
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler, client);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -403,12 +422,12 @@ public class JobProvider {
|
|||
}
|
||||
|
||||
private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
|
||||
int from, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler) {
|
||||
int from, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler, Client client) {
|
||||
if (bucketsToExpand.hasNext()) {
|
||||
Consumer<Integer> c = i -> {
|
||||
expandBuckets(jobId, query, buckets, bucketsToExpand, from + RECORDS_SIZE_PARAM, handler, errorHandler);
|
||||
expandBuckets(jobId, query, buckets, bucketsToExpand, from + RECORDS_SIZE_PARAM, handler, errorHandler, client);
|
||||
};
|
||||
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), from, c, errorHandler);
|
||||
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), from, c, errorHandler, client);
|
||||
} else {
|
||||
handler.accept(buckets);
|
||||
}
|
||||
|
@ -491,27 +510,31 @@ public class JobProvider {
|
|||
return new BatchedRecordsIterator(client, jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Expand a bucket with its records
|
||||
*/
|
||||
// TODO (norelease): Use scroll search instead of multiple searches with increasing from
|
||||
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from,
|
||||
Consumer<Integer> consumer, Consumer<Exception> errorHandler) {
|
||||
Consumer<Integer> consumer, Consumer<Exception> errorHandler, Client client) {
|
||||
Consumer<QueryPage<AnomalyRecord>> h = page -> {
|
||||
bucket.getRecords().addAll(page.results());
|
||||
if (partitionFieldValue != null) {
|
||||
bucket.setAnomalyScore(bucket.partitionAnomalyScore(partitionFieldValue));
|
||||
}
|
||||
if (page.count() > from + RECORDS_SIZE_PARAM) {
|
||||
expandBucket(jobId, includeInterim, bucket, partitionFieldValue, from + RECORDS_SIZE_PARAM, consumer, errorHandler);
|
||||
expandBucket(jobId, includeInterim, bucket, partitionFieldValue, from + RECORDS_SIZE_PARAM, consumer, errorHandler,
|
||||
client);
|
||||
} else {
|
||||
consumer.accept(bucket.getRecords().size());
|
||||
}
|
||||
};
|
||||
bucketRecords(jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(),
|
||||
false, partitionFieldValue, h, errorHandler);
|
||||
false, partitionFieldValue, h, errorHandler, client);
|
||||
}
|
||||
|
||||
void bucketRecords(String jobId, Bucket bucket, int from, int size, boolean includeInterim, String sortField,
|
||||
boolean descending, String partitionFieldValue, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
// Find the records using the time stamp rather than a parent-child
|
||||
// relationship. The parent-child filter involves two queries behind
|
||||
// the scenes, and Elasticsearch documentation claims it's significantly
|
||||
|
@ -533,19 +556,32 @@ public class JobProvider {
|
|||
.order(descending ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
|
||||
records(jobId, from, size, recordFilter, sb, SECONDARY_SORT, descending, handler, errorHandler);
|
||||
records(jobId, from, size, recordFilter, sb, SECONDARY_SORT, descending, handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a page of {@linkplain CategoryDefinition}s for the given <code>jobId</code>.
|
||||
*
|
||||
* Uses the internal client, so runs as the _xpack user
|
||||
* @param jobId the job id
|
||||
* @param from Skip the first N categories. This parameter is for paging
|
||||
* @param size Take only this number of categories
|
||||
*/
|
||||
public void categoryDefinitionsViaInternalClient(String jobId, String categoryId, Integer from, Integer size,
|
||||
Consumer<QueryPage<CategoryDefinition>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
categoryDefinitions(jobId, categoryId, from, size, handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a page of {@linkplain CategoryDefinition}s for the given <code>jobId</code>.
|
||||
* Uses a supplied client, so may run as the currently authenticated user
|
||||
* @param jobId the job id
|
||||
* @param from Skip the first N categories. This parameter is for paging
|
||||
* @param size Take only this number of categories
|
||||
*/
|
||||
public void categoryDefinitions(String jobId, String categoryId, Integer from, Integer size,
|
||||
Consumer<QueryPage<CategoryDefinition>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
if (categoryId != null && (from != null || size != null)) {
|
||||
throw new IllegalStateException("Both categoryId and pageParams are specified");
|
||||
}
|
||||
|
@ -585,15 +621,26 @@ public class JobProvider {
|
|||
QueryPage<CategoryDefinition> result =
|
||||
new QueryPage<>(results, searchResponse.getHits().getTotalHits(), CategoryDefinition.RESULTS_FIELD);
|
||||
handler.accept(result);
|
||||
}, errorHandler::accept));
|
||||
}, e -> errorHandler.accept(mapAuthFailure(e, jobId, GetCategoriesAction.NAME))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for anomaly records with the parameters in the
|
||||
* {@link org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder.RecordsQuery}
|
||||
* Uses the internal client, so runs as the _xpack user
|
||||
*/
|
||||
public void recordsViaInternalClient(String jobId, RecordsQueryBuilder.RecordsQuery query, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
records(jobId, query, handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for anomaly records with the parameters in the
|
||||
* {@link org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder.RecordsQuery}
|
||||
* Uses a supplied client, so may run as the currently authenticated user
|
||||
*/
|
||||
public void records(String jobId, RecordsQueryBuilder.RecordsQuery query, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
QueryBuilder fb = new ResultsFilterBuilder()
|
||||
.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
|
||||
.score(AnomalyRecord.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreThreshold())
|
||||
|
@ -606,7 +653,7 @@ public class JobProvider {
|
|||
.missing("_last")
|
||||
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
records(jobId, query.getFrom(), query.getSize(), fb, sb, SECONDARY_SORT, query.isSortDescending(), handler, errorHandler);
|
||||
records(jobId, query.getFrom(), query.getSize(), fb, sb, SECONDARY_SORT, query.isSortDescending(), handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -615,7 +662,7 @@ public class JobProvider {
|
|||
private void records(String jobId, int from, int size,
|
||||
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
|
||||
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
|
||||
recordFilter = new BoolQueryBuilder()
|
||||
|
@ -653,18 +700,28 @@ public class JobProvider {
|
|||
QueryPage<AnomalyRecord> queryPage =
|
||||
new QueryPage<>(results, searchResponse.getHits().getTotalHits(), AnomalyRecord.RESULTS_FIELD);
|
||||
handler.accept(queryPage);
|
||||
}, errorHandler::accept));
|
||||
}, e -> errorHandler.accept(mapAuthFailure(e, jobId, GetRecordsAction.NAME))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a page of influencers for the given job and within the given date
|
||||
* range
|
||||
*
|
||||
* Return a page of influencers for the given job and within the given date range
|
||||
* Uses the internal client, so runs as the _xpack user
|
||||
* @param jobId The job ID for which influencers are requested
|
||||
* @param query the query
|
||||
*/
|
||||
public void influencersViaInternalClient(String jobId, InfluencersQuery query, Consumer<QueryPage<Influencer>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
influencers(jobId, query, handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a page of influencers for the given job and within the given date range
|
||||
* Uses a supplied client, so may run as the currently authenticated user
|
||||
* @param jobId The job ID for which influencers are requested
|
||||
* @param query the query
|
||||
*/
|
||||
public void influencers(String jobId, InfluencersQuery query, Consumer<QueryPage<Influencer>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
QueryBuilder fb = new ResultsFilterBuilder()
|
||||
.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
|
||||
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
|
||||
|
@ -701,7 +758,7 @@ public class JobProvider {
|
|||
}
|
||||
QueryPage<Influencer> result = new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD);
|
||||
handler.accept(result);
|
||||
}, errorHandler));
|
||||
}, e -> errorHandler.accept(mapAuthFailure(e, jobId, GetInfluencersAction.NAME))));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -956,4 +1013,32 @@ public class JobProvider {
|
|||
public void getFilters(Consumer<Set<MlFilter>> handler, Consumer<Exception> errorHandler, Set<String> ids) {
|
||||
mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps authorization failures when querying ML indexes to job-specific authorization failures attributed to the ML actions.
|
||||
* Works by replacing the action name with another provided by the caller, and appending the job ID.
|
||||
* This is designed to improve understandability when an admin has applied index or document level security to the .ml-anomalies
|
||||
* indexes to allow some users to have access to certain job results but not others.
|
||||
* For example, if user ml_test is allowed to see some results, but not the ones for job "farequote" then:
|
||||
*
|
||||
* action [indices:data/read/search] is unauthorized for user [ml_test]
|
||||
*
|
||||
* gets mapped to:
|
||||
*
|
||||
* action [cluster:monitor/ml/anomaly_detectors/results/buckets/get] is unauthorized for user [ml_test] for job [farequote]
|
||||
*
|
||||
* Exceptions that are not related to authorization are returned unaltered.
|
||||
* @param e An exception that occurred while getting ML data
|
||||
* @param jobId The job ID
|
||||
* @param mappedActionName The outermost action name, that will make sense to the user who made the request
|
||||
*/
|
||||
static Exception mapAuthFailure(Exception e, String jobId, String mappedActionName) {
|
||||
if (e instanceof ElasticsearchStatusException) {
|
||||
if (((ElasticsearchStatusException)e).status() == RestStatus.FORBIDDEN) {
|
||||
e = Exceptions.authorizationError(
|
||||
e.getMessage().replaceFirst("action \\[.*?\\]", "action [" + mappedActionName + "]") + " for job [{}]", jobId);
|
||||
}
|
||||
}
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,22 +26,26 @@ public final class ClusterPrivilege extends Privilege {
|
|||
// shared automatons
|
||||
private static final Automaton MANAGE_SECURITY_AUTOMATON = patterns("cluster:admin/xpack/security/*");
|
||||
private static final Automaton MONITOR_AUTOMATON = patterns("cluster:monitor/*");
|
||||
private static final Automaton MONITOR_ML_AUTOMATON = patterns("cluster:monitor/ml/*");
|
||||
private static final Automaton ALL_CLUSTER_AUTOMATON = patterns("cluster:*", "indices:admin/template/*");
|
||||
private static final Automaton MANAGE_AUTOMATON = minusAndMinimize(ALL_CLUSTER_AUTOMATON, MANAGE_SECURITY_AUTOMATON);
|
||||
private static final Automaton MANAGE_ML_AUTOMATON = patterns("cluster:admin/ml/*", "cluster:monitor/ml/*");
|
||||
private static final Automaton TRANSPORT_CLIENT_AUTOMATON = patterns("cluster:monitor/nodes/liveness", "cluster:monitor/state");
|
||||
private static final Automaton MANAGE_IDX_TEMPLATE_AUTOMATON = patterns("indices:admin/template/*");
|
||||
private static final Automaton MANAGE_INGEST_PIPELINE_AUTOMATON = patterns("cluster:admin/ingest/pipeline/*");
|
||||
|
||||
public static final ClusterPrivilege NONE = new ClusterPrivilege("none", Automatons.EMPTY);
|
||||
public static final ClusterPrivilege ALL = new ClusterPrivilege("all", ALL_CLUSTER_AUTOMATON);
|
||||
public static final ClusterPrivilege MONITOR = new ClusterPrivilege("monitor", MONITOR_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE = new ClusterPrivilege("manage", MANAGE_AUTOMATON);
|
||||
public static final ClusterPrivilege MONITOR = new ClusterPrivilege("monitor", MONITOR_AUTOMATON);
|
||||
public static final ClusterPrivilege MONITOR_ML = new ClusterPrivilege("monitor_ml", MONITOR_ML_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE = new ClusterPrivilege("manage", MANAGE_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE_ML = new ClusterPrivilege("manage_ml", MANAGE_ML_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE_IDX_TEMPLATES =
|
||||
new ClusterPrivilege("manage_index_templates", MANAGE_IDX_TEMPLATE_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE_INGEST_PIPELINES =
|
||||
new ClusterPrivilege("manage_ingest_pipelines", MANAGE_INGEST_PIPELINE_AUTOMATON);
|
||||
public static final ClusterPrivilege TRANSPORT_CLIENT = new ClusterPrivilege("transport_client", TRANSPORT_CLIENT_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE_SECURITY = new ClusterPrivilege("manage_security", MANAGE_SECURITY_AUTOMATON);
|
||||
public static final ClusterPrivilege TRANSPORT_CLIENT = new ClusterPrivilege("transport_client", TRANSPORT_CLIENT_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE_SECURITY = new ClusterPrivilege("manage_security", MANAGE_SECURITY_AUTOMATON);
|
||||
public static final ClusterPrivilege MANAGE_PIPELINE = new ClusterPrivilege("manage_pipeline", "cluster:admin/ingest/pipeline/*");
|
||||
|
||||
public static final Predicate<String> ACTION_MATCHER = ClusterPrivilege.ALL.predicate();
|
||||
|
@ -50,7 +54,9 @@ public final class ClusterPrivilege extends Privilege {
|
|||
.put("none", NONE)
|
||||
.put("all", ALL)
|
||||
.put("monitor", MONITOR)
|
||||
.put("monitor_ml", MONITOR_ML)
|
||||
.put("manage", MANAGE)
|
||||
.put("manage_ml", MANAGE_ML)
|
||||
.put("manage_index_templates", MANAGE_IDX_TEMPLATES)
|
||||
.put("manage_ingest_pipelines", MANAGE_INGEST_PIPELINES)
|
||||
.put("transport_client", TRANSPORT_CLIENT)
|
||||
|
|
|
@ -62,6 +62,14 @@ public class ReservedRolesStore {
|
|||
null, null, MetadataUtils.DEFAULT_RESERVED_METADATA))
|
||||
.put("beats_system", new RoleDescriptor("beats_system", new String[] { "monitor", MonitoringBulkAction.NAME},
|
||||
null, null, MetadataUtils.DEFAULT_RESERVED_METADATA))
|
||||
.put("machine_learning_user", new RoleDescriptor("machine_learning_user", new String[] { "monitor_ml" },
|
||||
new RoleDescriptor.IndicesPrivileges[] { RoleDescriptor.IndicesPrivileges.builder().indices(".ml-anomalies*",
|
||||
".ml-notifications").privileges("view_index_metadata", "read").build() },
|
||||
null, MetadataUtils.DEFAULT_RESERVED_METADATA))
|
||||
.put("machine_learning_admin", new RoleDescriptor("machine_learning_admin", new String[] { "manage_ml" },
|
||||
new RoleDescriptor.IndicesPrivileges[] {
|
||||
RoleDescriptor.IndicesPrivileges.builder().indices(".ml-*").privileges("view_index_metadata", "read")
|
||||
.build() }, null, MetadataUtils.DEFAULT_RESERVED_METADATA))
|
||||
.immutableMap();
|
||||
}
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
|
||||
consumer.accept(new ResourceNotFoundException("dummy"));
|
||||
return null;
|
||||
}).when(jobProvider).buckets(any(), any(), any(), any());
|
||||
}).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
@ -396,7 +396,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}, e -> {
|
||||
errorHolder.set(e);
|
||||
latch.countDown();
|
||||
});
|
||||
}, client());
|
||||
latch.await();
|
||||
if (errorHolder.get() != null) {
|
||||
throw errorHolder.get();
|
||||
|
@ -414,7 +414,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}, e -> {
|
||||
errorHolder.set(e);
|
||||
latch.countDown();
|
||||
});
|
||||
}, client());
|
||||
latch.await();
|
||||
if (errorHolder.get() != null) {
|
||||
throw errorHolder.get();
|
||||
|
@ -450,7 +450,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}, e -> {
|
||||
errorHolder.set(e);
|
||||
latch.countDown();
|
||||
});
|
||||
}, client());
|
||||
latch.await();
|
||||
if (errorHolder.get() != null) {
|
||||
throw errorHolder.get();
|
||||
|
@ -468,7 +468,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}, e -> {
|
||||
errorHolder.set(e);
|
||||
latch.countDown();
|
||||
});
|
||||
}, client());
|
||||
latch.await();
|
||||
if (errorHolder.get() != null) {
|
||||
throw errorHolder.get();
|
||||
|
|
|
@ -461,7 +461,7 @@ public class DatafeedJobIT extends ESRestTestCase {
|
|||
|
||||
@After
|
||||
public void clearMlState() throws Exception {
|
||||
new MlRestTestStateCleaner(logger, client(), this).clearMlMetadata();
|
||||
new MlRestTestStateCleaner(logger, adminClient(), this).clearMlMetadata();
|
||||
}
|
||||
|
||||
private static class DatafeedBuilder {
|
||||
|
|
|
@ -604,6 +604,6 @@ public class MlJobIT extends ESRestTestCase {
|
|||
|
||||
@After
|
||||
public void clearMlState() throws IOException {
|
||||
new MlRestTestStateCleaner(logger, client(), this).clearMlMetadata();
|
||||
new MlRestTestStateCleaner(logger, adminClient(), this).clearMlMetadata();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -19,12 +18,12 @@ import java.util.Map;
|
|||
public class MlRestTestStateCleaner {
|
||||
|
||||
private final Logger logger;
|
||||
private final RestClient client;
|
||||
private final RestClient adminClient;
|
||||
private final ESRestTestCase testCase;
|
||||
|
||||
public MlRestTestStateCleaner(Logger logger, RestClient client, ESRestTestCase testCase) {
|
||||
public MlRestTestStateCleaner(Logger logger, RestClient adminClient, ESRestTestCase testCase) {
|
||||
this.logger = logger;
|
||||
this.client = client;
|
||||
this.adminClient = adminClient;
|
||||
this.testCase = testCase;
|
||||
}
|
||||
|
||||
|
@ -36,7 +35,7 @@ public class MlRestTestStateCleaner {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void deleteAllDatafeeds() throws IOException {
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(client.performRequest("GET", "/_cluster/state",
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
|
||||
Collections.singletonMap("filter_path", "metadata.ml.datafeeds")));
|
||||
List<Map<String, Object>> datafeeds =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.datafeeds", clusterStateAsMap);
|
||||
|
@ -47,7 +46,11 @@ public class MlRestTestStateCleaner {
|
|||
for (Map<String, Object> datafeed : datafeeds) {
|
||||
String datafeedId = (String) datafeed.get("datafeed_id");
|
||||
try {
|
||||
client.performRequest("POST", "/_xpack/ml/datafeeds/" + datafeedId + "/_stop");
|
||||
int statusCode = adminClient.performRequest("POST",
|
||||
"/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) {
|
||||
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
|
||||
|
@ -55,12 +58,15 @@ public class MlRestTestStateCleaner {
|
|||
logger.warn("failed to stop datafeed [" + datafeedId + "]", e);
|
||||
}
|
||||
}
|
||||
client.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId);
|
||||
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteAllJobs() throws IOException {
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(client.performRequest("GET", "/_cluster/state",
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
|
||||
Collections.singletonMap("filter_path", "metadata.ml.jobs")));
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Map<String, Object>> jobConfigs =
|
||||
|
@ -72,7 +78,11 @@ public class MlRestTestStateCleaner {
|
|||
for (Map<String, Object> jobConfig : jobConfigs) {
|
||||
String jobId = (String) jobConfig.get("job_id");
|
||||
try {
|
||||
client.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close");
|
||||
int statusCode = adminClient.performRequest("POST",
|
||||
"/_xpack/ml/anomaly_detectors/" + jobId + "/_close").getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when closing job " + jobId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]")
|
||||
|| e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) {
|
||||
|
@ -81,11 +91,17 @@ public class MlRestTestStateCleaner {
|
|||
logger.warn("failed to close job [" + jobId + "]", e);
|
||||
}
|
||||
}
|
||||
client.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId);
|
||||
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting job " + jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteDotML() throws IOException {
|
||||
client.performRequest("DELETE", ".ml-*?ignore_unavailable=true");
|
||||
int statusCode = adminClient.performRequest("DELETE", ".ml-*?ignore_unavailable=true").getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting .ml-* indexes");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
package org.elasticsearch.xpack.ml.job;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -107,7 +107,8 @@ public class JobManagerTests extends ESTestCase {
|
|||
private JobManager createJobManager() {
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
|
||||
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor);
|
||||
Client client = mock(Client.class);
|
||||
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, client);
|
||||
}
|
||||
|
||||
private ClusterState createClusterState() {
|
||||
|
|
|
@ -304,7 +304,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] holder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);});
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<Bucket> buckets = holder[0];
|
||||
assertEquals(1L, buckets.count());
|
||||
QueryBuilder query = queryBuilderHolder[0];
|
||||
|
@ -339,7 +339,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] holder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);});
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<Bucket> buckets = holder[0];
|
||||
assertEquals(1L, buckets.count());
|
||||
QueryBuilder query = queryBuilderHolder[0];
|
||||
|
@ -378,7 +378,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] holder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);});
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<Bucket> buckets = holder[0];
|
||||
assertEquals(1L, buckets.count());
|
||||
QueryBuilder query = queryBuilderHolder[0];
|
||||
|
@ -402,7 +402,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
BucketsQueryBuilder bq = new BucketsQueryBuilder();
|
||||
bq.timestamp(Long.toString(timestamp));
|
||||
Exception[] holder = new Exception[1];
|
||||
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;});
|
||||
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;}, client);
|
||||
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
||||
}
|
||||
|
||||
|
@ -427,7 +427,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] bucketHolder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), q -> {bucketHolder[0] = q;}, e -> {});
|
||||
provider.buckets(jobId, bq.build(), q -> {bucketHolder[0] = q;}, e -> {}, client);
|
||||
assertThat(bucketHolder[0].count(), equalTo(1L));
|
||||
Bucket b = bucketHolder[0].results().get(0);
|
||||
assertEquals(now, b.getTimestamp());
|
||||
|
@ -454,7 +454,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
bq.timestamp(Long.toString(now.getTime()));
|
||||
|
||||
Exception[] holder = new Exception[1];
|
||||
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;});
|
||||
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;}, client);
|
||||
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
||||
}
|
||||
|
||||
|
@ -495,7 +495,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
|
||||
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new);
|
||||
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new, client);
|
||||
QueryPage<AnomalyRecord> recordPage = holder[0];
|
||||
assertEquals(2L, recordPage.count());
|
||||
List<AnomalyRecord> records = recordPage.results();
|
||||
|
@ -552,7 +552,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
|
||||
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new);
|
||||
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new, client);
|
||||
QueryPage<AnomalyRecord> recordPage = holder[0];
|
||||
assertEquals(2L, recordPage.count());
|
||||
List<AnomalyRecord> records = recordPage.results();
|
||||
|
@ -599,7 +599,8 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
|
||||
provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, "", page -> holder[0] = page, RuntimeException::new);
|
||||
provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, "", page -> holder[0] = page, RuntimeException::new,
|
||||
client);
|
||||
QueryPage<AnomalyRecord> recordPage = holder[0];
|
||||
assertEquals(2L, recordPage.count());
|
||||
List<AnomalyRecord> records = recordPage.results();
|
||||
|
@ -635,7 +636,8 @@ public class JobProviderTests extends ESTestCase {
|
|||
JobProvider provider = createProvider(client);
|
||||
|
||||
Integer[] holder = new Integer[1];
|
||||
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new);
|
||||
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new,
|
||||
client);
|
||||
int records = holder[0];
|
||||
assertEquals(400L, records);
|
||||
}
|
||||
|
@ -664,7 +666,8 @@ public class JobProviderTests extends ESTestCase {
|
|||
JobProvider provider = createProvider(client);
|
||||
|
||||
Integer[] holder = new Integer[1];
|
||||
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new);
|
||||
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new,
|
||||
client);
|
||||
int records = holder[0];
|
||||
|
||||
// This is not realistic, but is an artifact of the fact that the mock
|
||||
|
@ -694,7 +697,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
|
||||
provider.categoryDefinitions(jobId, null, from, size, r -> {holder[0] = r;},
|
||||
e -> {throw new RuntimeException(e);});
|
||||
e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<CategoryDefinition> categoryDefinitions = holder[0];
|
||||
assertEquals(1L, categoryDefinitions.count());
|
||||
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
|
||||
|
@ -717,7 +720,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
|
||||
provider.categoryDefinitions(jobId, categoryId, null, null,
|
||||
r -> {holder[0] = r;}, e -> {throw new RuntimeException(e);});
|
||||
r -> {holder[0] = r;}, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<CategoryDefinition> categoryDefinitions = holder[0];
|
||||
assertEquals(1L, categoryDefinitions.count());
|
||||
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
|
||||
|
@ -761,7 +764,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Influencer>[] holder = new QueryPage[1];
|
||||
InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).includeInterim(false).build();
|
||||
provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new);
|
||||
provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new, client);
|
||||
QueryPage<Influencer> page = holder[0];
|
||||
assertEquals(2L, page.count());
|
||||
|
||||
|
@ -824,7 +827,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
QueryPage<Influencer>[] holder = new QueryPage[1];
|
||||
InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).start("0").end("0").sortField("sort")
|
||||
.sortDescending(true).anomalyScoreThreshold(0.0).includeInterim(true).build();
|
||||
provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new);
|
||||
provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new, client);
|
||||
QueryPage<Influencer> page = holder[0];
|
||||
assertEquals(2L, page.count());
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ public class XPackRestIT extends XPackRestTestCase {
|
|||
|
||||
@After
|
||||
public void clearMlState() throws IOException {
|
||||
new MlRestTestStateCleaner(logger, client(), this).clearMlMetadata();
|
||||
new MlRestTestStateCleaner(logger, adminClient(), this).clearMlMetadata();
|
||||
}
|
||||
|
||||
public XPackRestIT(ClientYamlTestCandidate testCandidate) {
|
||||
|
|
|
@ -107,12 +107,12 @@ cluster:admin/ingest/pipeline/get
|
|||
cluster:admin/ingest/pipeline/put
|
||||
cluster:admin/ingest/pipeline/simulate
|
||||
cluster:admin/ml/filters/get
|
||||
cluster:admin/ml/anomaly_detectors/results/categories/get
|
||||
cluster:admin/ml/anomaly_detectors/stats/get
|
||||
cluster:admin/ml/anomaly_detectors/results/buckets/get
|
||||
cluster:admin/ml/anomaly_detectors/model_snapshots/get
|
||||
cluster:admin/ml/anomaly_detectors/results/records/get
|
||||
cluster:admin/ml/anomaly_detectors/results/influencers/get
|
||||
cluster:monitor/ml/anomaly_detectors/results/categories/get
|
||||
cluster:monitor/ml/anomaly_detectors/stats/get
|
||||
cluster:monitor/ml/anomaly_detectors/results/buckets/get
|
||||
cluster:monitor/ml/anomaly_detectors/model_snapshots/get
|
||||
cluster:monitor/ml/anomaly_detectors/results/records/get
|
||||
cluster:monitor/ml/anomaly_detectors/results/influencers/get
|
||||
cluster:admin/ml/datafeeds/preview
|
||||
cluster:admin/ml/datafeeds/put
|
||||
cluster:admin/ml/datafeeds/update
|
||||
|
@ -126,18 +126,18 @@ cluster:admin/ml/anomaly_detectors/data/post
|
|||
cluster:admin/ml/anomaly_detectors/close
|
||||
cluster:admin/ml/filters/put
|
||||
cluster:admin/ml/anomaly_detectors/put
|
||||
cluster:admin/ml/anomaly_detectors/get
|
||||
cluster:admin/ml/datafeeds/get
|
||||
cluster:monitor/ml/anomaly_detectors/get
|
||||
cluster:monitor/ml/datafeeds/get
|
||||
cluster:admin/ml/anomaly_detectors/model_snapshots/update
|
||||
cluster:admin/ml/anomaly_detectors/flush
|
||||
cluster:admin/ml/filters/delete
|
||||
cluster:admin/ml/datafeeds/stats/get
|
||||
cluster:monitor/ml/datafeeds/stats/get
|
||||
cluster:admin/ml/datafeeds/stop
|
||||
cluster:admin/ml/datafeeds/start
|
||||
cluster:admin/ml/anomaly_detectors/open
|
||||
cluster:admin/ml/job/update
|
||||
indices:data/write/delete/mlbyquery
|
||||
cluster:admin/ml/job/update/process
|
||||
cluster:admin/ml/anomaly_detectors/update
|
||||
indices:internal/data/write/mldeletebyquery
|
||||
internal:admin/ml/anomaly_detectors/update/process
|
||||
cluster:admin/ml/delete_expired_data
|
||||
cluster:admin/persistent/create
|
||||
cluster:admin/persistent/start
|
||||
|
|
|
@ -107,7 +107,7 @@ setup:
|
|||
anomaly_score: "80.0"
|
||||
|
||||
---
|
||||
"Test mutually-exclusive params (via body)":
|
||||
"Test mutually-exclusive params via body":
|
||||
- do:
|
||||
catch: request
|
||||
xpack.ml.get_buckets:
|
||||
|
|
|
@ -69,7 +69,7 @@ setup:
|
|||
size: 1
|
||||
|
||||
---
|
||||
"Test with invalid param combinations (via body)":
|
||||
"Test with invalid param combinations via body":
|
||||
- do:
|
||||
catch: request
|
||||
xpack.ml.get_categories:
|
||||
|
|
|
@ -57,7 +57,7 @@
|
|||
- match: { acknowledged: true }
|
||||
|
||||
---
|
||||
"Test job config that's invalid only because of the job ID":
|
||||
"Test job config that is invalid only because of the job ID":
|
||||
- do:
|
||||
catch: /Invalid job_id; '_' must be lowercase alphanumeric, may contain hyphens or underscores, may not start with underscore/
|
||||
xpack.ml.validate:
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
apply plugin: 'elasticsearch.standalone-rest-test'
|
||||
apply plugin: 'elasticsearch.rest-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':x-pack-elasticsearch:plugin', configuration: 'runtime')
|
||||
}
|
||||
|
||||
// bring in machine learning rest test suite
|
||||
task copyMlRestTests(type: Copy) {
|
||||
into project.sourceSets.test.output.resourcesDir
|
||||
from project(':x-pack-elasticsearch:plugin').sourceSets.test.resources.srcDirs
|
||||
include 'rest-api-spec/test/ml/**'
|
||||
}
|
||||
|
||||
integTest {
|
||||
dependsOn copyMlRestTests
|
||||
}
|
||||
|
||||
// remove tests that are expected to throw an exception, because we cannot then
|
||||
// know whether to expect an authorization exception or a validation exception
|
||||
integTestRunner {
|
||||
systemProperty 'tests.rest.blacklist', [
|
||||
'ml/datafeeds_crud/Test delete datafeed with missing id',
|
||||
'ml/datafeeds_crud/Test put datafeed referring to missing job_id',
|
||||
'ml/datafeeds_crud/Test put datafeed with invalid query',
|
||||
'ml/datafeeds_crud/Test update datafeed with missing id',
|
||||
'ml/delete_model_snapshot/Test delete snapshot missing snapshotId',
|
||||
'ml/delete_model_snapshot/Test delete snapshot missing job_id',
|
||||
'ml/delete_model_snapshot/Test delete with in-use model',
|
||||
'ml/filter_crud/Test create filter api without ID',
|
||||
'ml/filter_crud/Test get filter API with bad ID',
|
||||
'ml/filter_crud/Test invalid param combinations',
|
||||
'ml/filter_crud/Test non-existing filter',
|
||||
'ml/get_datafeed_stats/Test get datafeed stats given missing datafeed_id',
|
||||
'ml/get_datafeeds/Test get datafeed given missing datafeed_id',
|
||||
'ml/jobs_crud/Test get job API with non existing job id',
|
||||
'ml/jobs_crud/Test put job with inconsistent body/param ids',
|
||||
'ml/jobs_get/Test get job given missing job_id',
|
||||
'ml/jobs_get_result_buckets/Test mutually-exclusive params',
|
||||
'ml/jobs_get_result_buckets/Test mutually-exclusive params via body',
|
||||
'ml/jobs_get_result_categories/Test with invalid param combinations',
|
||||
'ml/jobs_get_result_categories/Test with invalid param combinations via body',
|
||||
'ml/jobs_get_stats/Test get job stats given missing job',
|
||||
'ml/post_data/Test Flush data with invalid parameters',
|
||||
'ml/post_data/Test flushing, posting and closing a closed job',
|
||||
'ml/post_data/Test open and close with non-existent job id',
|
||||
'ml/post_data/Test POST data with invalid parameters',
|
||||
'ml/preview_datafeed/Test preview missing datafeed',
|
||||
'ml/revert_model_snapshot/Test revert model with invalid snapshotId',
|
||||
'ml/start_stop_datafeed/Test start datafeed job, but not open',
|
||||
'ml/start_stop_datafeed/Test start non existing datafeed',
|
||||
'ml/start_stop_datafeed/Test stop already stopped datafeed job',
|
||||
'ml/start_stop_datafeed/Test stop non existing datafeed',
|
||||
'ml/update_model_snapshot/Test without description',
|
||||
'ml/validate/Test invalid job config',
|
||||
'ml/validate/Test job config that is invalid only because of the job ID',
|
||||
'ml/validate_detector/Test invalid detector'
|
||||
].join(',')
|
||||
}
|
||||
|
||||
integTestCluster {
|
||||
plugin ':x-pack-elasticsearch:plugin'
|
||||
extraConfigFile 'x-pack/roles.yml', 'roles.yml'
|
||||
setupCommand 'setupTestAdminUser',
|
||||
'bin/x-pack/users', 'useradd', 'test_admin', '-p', 'changeme', '-r', 'superuser'
|
||||
setupCommand 'setupMlAdminUser',
|
||||
'bin/x-pack/users', 'useradd', 'ml_admin', '-p', 'changeme', '-r', 'minimal,machine_learning_admin'
|
||||
setupCommand 'setupMlUserUser',
|
||||
'bin/x-pack/users', 'useradd', 'ml_user', '-p', 'changeme', '-r', 'minimal,machine_learning_user'
|
||||
setupCommand 'setupPowerlessUser',
|
||||
'bin/x-pack/users', 'useradd', 'no_ml', '-p', 'changeme', '-r', 'minimal'
|
||||
waitCondition = { node, ant ->
|
||||
File tmpFile = new File(node.cwd, 'wait.success')
|
||||
ant.get(src: "http://${node.httpUri()}",
|
||||
dest: tmpFile.toString(),
|
||||
username: 'test_admin',
|
||||
password: 'changeme',
|
||||
ignoreerrors: true,
|
||||
retries: 10)
|
||||
return tmpFile.exists()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
minimal:
|
||||
cluster:
|
||||
- cluster:monitor/health
|
||||
- cluster:monitor/main
|
||||
indices:
|
||||
- names: '*'
|
||||
privileges:
|
||||
- indices:admin/create
|
||||
- indices:admin/exists
|
||||
- indices:admin/get
|
||||
- indices:admin/mapping/put
|
||||
- indices:admin/refresh
|
||||
- indices:data/read/search
|
||||
- indices:data/write/bulk
|
||||
- indices:data/write/index
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.smoketest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
|
||||
import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
|
||||
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
|
||||
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
|
||||
import org.elasticsearch.xpack.ml.integration.MlRestTestStateCleaner;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.security.SecurityLifecycleService;
|
||||
import org.elasticsearch.xpack.security.authc.support.SecuredString;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
|
||||
|
||||
public class MlWithSecurityIT extends ESClientYamlSuiteTestCase {
|
||||
|
||||
private static final String TEST_ADMIN_USERNAME = "test_admin";
|
||||
private static final String TEST_ADMIN_PASSWORD = "changeme";
|
||||
|
||||
@After
|
||||
public void clearMlState() throws IOException {
|
||||
new MlRestTestStateCleaner(logger, adminClient(), this).clearMlMetadata();
|
||||
}
|
||||
|
||||
public MlWithSecurityIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for the Security and .ml-anomalies templates to be created by the {@link SecurityLifecycleService}
|
||||
* and {@link MachineLearningTemplateRegistry}.
|
||||
*/
|
||||
@Before
|
||||
public void waitForIndexTemplates() throws Exception {
|
||||
String templateApi = "indices.exists_template";
|
||||
Map<String, String> securityParams = Collections.singletonMap("name", SecurityLifecycleService.SECURITY_TEMPLATE_NAME);
|
||||
Map<String, String> anomaliesParams = Collections.singletonMap("name", AnomalyDetectorsIndex.jobResultsIndexPrefix());
|
||||
Map<String, String> headers = Collections.singletonMap("Authorization",
|
||||
basicAuthHeaderValue(TEST_ADMIN_USERNAME, new SecuredString(TEST_ADMIN_PASSWORD.toCharArray())));
|
||||
|
||||
for (Map<String, String> params : Arrays.asList(securityParams, anomaliesParams)) {
|
||||
AtomicReference<IOException> exceptionHolder = new AtomicReference<>();
|
||||
awaitBusy(() -> {
|
||||
try {
|
||||
ClientYamlTestResponse response = getAdminExecutionContext().callApi(templateApi, params, Collections.emptyList(),
|
||||
headers);
|
||||
if (response.getStatusCode() == HttpStatus.SC_OK) {
|
||||
exceptionHolder.set(null);
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
exceptionHolder.set(e);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
IOException exception = exceptionHolder.get();
|
||||
if (exception != null) {
|
||||
throw new IllegalStateException("Exception when waiting for index template to be created", exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws IOException {
|
||||
return ESClientYamlSuiteTestCase.createParameters();
|
||||
}
|
||||
|
||||
protected String[] getCredentials() {
|
||||
return new String[]{"ml_admin", "changeme"};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings restClientSettings() {
|
||||
String[] creds = getCredentials();
|
||||
String token = basicAuthHeaderValue(creds[0], new SecuredString(creds[1].toCharArray()));
|
||||
return Settings.builder()
|
||||
.put(ThreadContext.PREFIX + ".Authorization", token)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings restAdminSettings() {
|
||||
String token = basicAuthHeaderValue(TEST_ADMIN_USERNAME, new SecuredString(TEST_ADMIN_PASSWORD.toCharArray()));
|
||||
return Settings.builder()
|
||||
.put(ThreadContext.PREFIX + ".Authorization", token)
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.smoketest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
|
||||
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
|
||||
public class MlWithSecurityInsufficientRoleIT extends MlWithSecurityIT {
|
||||
|
||||
public MlWithSecurityInsufficientRoleIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void test() throws IOException {
|
||||
AssertionError ae = expectThrows(AssertionError.class, super::test);
|
||||
assertThat(ae.getMessage(),
|
||||
either(containsString("action [cluster:monitor/ml")).or(containsString("action [cluster:admin/ml")));
|
||||
assertThat(ae.getMessage(), containsString("returned [403 Forbidden]"));
|
||||
assertThat(ae.getMessage(), containsString("is unauthorized for user [no_ml]"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] getCredentials() {
|
||||
return new String[]{"no_ml", "changeme"};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.smoketest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
|
||||
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
|
||||
import org.elasticsearch.test.rest.yaml.section.DoSection;
|
||||
import org.elasticsearch.test.rest.yaml.section.ExecutableSection;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
|
||||
public class MlWithSecurityUserRoleIT extends MlWithSecurityIT {
|
||||
|
||||
private final ClientYamlTestCandidate testCandidate;
|
||||
|
||||
public MlWithSecurityUserRoleIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
this.testCandidate = testCandidate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void test() throws IOException {
|
||||
try {
|
||||
super.test();
|
||||
|
||||
// We should have got here if and only if the test consisted entirely of GETs
|
||||
for (ExecutableSection section : testCandidate.getTestSection().getExecutableSections()) {
|
||||
if (section instanceof DoSection) {
|
||||
if (((DoSection) section).getApiCallSection().getApi().startsWith("xpack.ml.get_") == false) {
|
||||
fail("should have failed because of missing role");
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (AssertionError ae) {
|
||||
assertThat(ae.getMessage(),
|
||||
either(containsString("action [cluster:monitor/ml")).or(containsString("action [cluster:admin/ml")));
|
||||
assertThat(ae.getMessage(), containsString("returned [403 Forbidden]"));
|
||||
assertThat(ae.getMessage(), containsString("is unauthorized for user [ml_user]"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] getCredentials() {
|
||||
return new String[]{"ml_user", "changeme"};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MlRestTestStateCleaner {
|
||||
|
||||
private final Logger logger;
|
||||
private final RestClient adminClient;
|
||||
private final ESRestTestCase testCase;
|
||||
|
||||
public MlRestTestStateCleaner(Logger logger, RestClient adminClient, ESRestTestCase testCase) {
|
||||
this.logger = logger;
|
||||
this.adminClient = adminClient;
|
||||
this.testCase = testCase;
|
||||
}
|
||||
|
||||
public void clearMlMetadata() throws IOException {
|
||||
deleteAllDatafeeds();
|
||||
deleteAllJobs();
|
||||
deleteDotML();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void deleteAllDatafeeds() throws IOException {
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
|
||||
Collections.singletonMap("filter_path", "metadata.ml.datafeeds")));
|
||||
List<Map<String, Object>> datafeeds =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.datafeeds", clusterStateAsMap);
|
||||
if (datafeeds == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map<String, Object> datafeed : datafeeds) {
|
||||
String datafeedId = (String) datafeed.get("datafeed_id");
|
||||
try {
|
||||
int statusCode = adminClient.performRequest("POST",
|
||||
"/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) {
|
||||
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
|
||||
} else {
|
||||
logger.warn("failed to stop datafeed [" + datafeedId + "]", e);
|
||||
}
|
||||
}
|
||||
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteAllJobs() throws IOException {
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
|
||||
Collections.singletonMap("filter_path", "metadata.ml.jobs")));
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Map<String, Object>> jobConfigs =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.jobs", clusterStateAsMap);
|
||||
if (jobConfigs == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map<String, Object> jobConfig : jobConfigs) {
|
||||
String jobId = (String) jobConfig.get("job_id");
|
||||
try {
|
||||
int statusCode = adminClient.performRequest("POST",
|
||||
"/_xpack/ml/anomaly_detectors/" + jobId + "/_close").getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when closing job " + jobId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]")
|
||||
|| e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) {
|
||||
logger.debug("job [" + jobId + "] has already been closed", e);
|
||||
} else {
|
||||
logger.warn("failed to close job [" + jobId + "]", e);
|
||||
}
|
||||
}
|
||||
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting job " + jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteDotML() throws IOException {
|
||||
int statusCode = adminClient.performRequest("DELETE", ".ml-*?ignore_unavailable=true").getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting .ml-* indexes");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue