[ML] Check underlying index permissions on datafeed PUT (elastic/x-pack-elasticsearch#809)
Also added a test to ensure underlying index permissions are required for datafeed preview. Relates elastic/x-pack-elasticsearch#648 Original commit: elastic/x-pack-elasticsearch@6edadbb401
This commit is contained in:
parent
93d7b8c14b
commit
7756067e5d
|
@ -215,6 +215,9 @@ public class PreviewDatafeedAction extends Action<PreviewDatafeedAction.Request,
|
|||
}
|
||||
DatafeedConfig.Builder datafeedWithAutoChunking = new DatafeedConfig.Builder(datafeed);
|
||||
datafeedWithAutoChunking.setChunkingConfig(ChunkingConfig.newAuto());
|
||||
// NB: this is using the client from the transport layer, NOT the internal client.
|
||||
// This is important because it means the datafeed search will fail if the user
|
||||
// requesting the preview doesn't have permission to search the relevant indices.
|
||||
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job);
|
||||
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, System.currentTimeMillis());
|
||||
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
|
||||
|
|
|
@ -8,11 +8,13 @@ package org.elasticsearch.xpack.ml.action;
|
|||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -21,22 +23,32 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.XPackSettings;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.security.SecurityContext;
|
||||
import org.elasticsearch.xpack.security.action.user.HasPrivilegesAction;
|
||||
import org.elasticsearch.xpack.security.action.user.HasPrivilegesRequest;
|
||||
import org.elasticsearch.xpack.security.action.user.HasPrivilegesResponse;
|
||||
import org.elasticsearch.xpack.security.authz.RoleDescriptor;
|
||||
import org.elasticsearch.xpack.security.support.Exceptions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -180,14 +192,19 @@ public class PutDatafeedAction extends Action<PutDatafeedAction.Request, PutData
|
|||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
private final XPackLicenseState licenseState;
|
||||
private final Client client;
|
||||
private final boolean securityEnabled;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, XPackLicenseState licenseState, ActionFilters actionFilters,
|
||||
public TransportAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, ThreadPool threadPool, Client client,
|
||||
XPackLicenseState licenseState, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, PutDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
super(settings, PutDatafeedAction.NAME, transportService, clusterService, threadPool,
|
||||
actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.licenseState = licenseState;
|
||||
this.client = client;
|
||||
this.securityEnabled = XPackSettings.SECURITY_ENABLED.get(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -201,8 +218,58 @@ public class PutDatafeedAction extends Action<PutDatafeedAction.Request, PutData
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
clusterService.submitStateUpdateTask("put-datafeed-" + request.getDatafeed().getId(),
|
||||
protected void masterOperation(Request request, ClusterState state,
|
||||
ActionListener<Response> listener) throws Exception {
|
||||
// If security is enabled only create the datafeed if the user requesting creation has
|
||||
// permission to read the indices the datafeed is going to read from
|
||||
if (securityEnabled) {
|
||||
String username = new SecurityContext(settings,
|
||||
threadPool.getThreadContext()).getUser().principal();
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, request, r, listener),
|
||||
listener::onFailure);
|
||||
|
||||
HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
|
||||
privRequest.username(username);
|
||||
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
|
||||
// We just check for permission to use the search action. In reality we'll also
|
||||
// use the scroll action, but that's considered an implementation detail.
|
||||
privRequest.indexPrivileges(RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(request.getDatafeed().getIndexes()
|
||||
.toArray(new String[0]))
|
||||
.privileges(SearchAction.NAME)
|
||||
.build());
|
||||
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
} else {
|
||||
putDatafeed(request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void handlePrivsResponse(String username, Request request,
|
||||
HasPrivilegesResponse response,
|
||||
ActionListener<Response> listener) throws IOException {
|
||||
if (response.isCompleteMatch()) {
|
||||
putDatafeed(request, listener);
|
||||
} else {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.startObject();
|
||||
for (HasPrivilegesResponse.IndexPrivileges index : response.getIndexPrivileges()) {
|
||||
builder.field(index.getIndex());
|
||||
builder.map(index.getPrivileges());
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
listener.onFailure(Exceptions.authorizationError("Cannot create datafeed [{}]" +
|
||||
" because user {} lacks permissions on the indexes to be" +
|
||||
" searched: {}",
|
||||
request.getDatafeed().getId(), username, builder.string()));
|
||||
}
|
||||
}
|
||||
|
||||
private void putDatafeed(Request request, ActionListener<Response> listener) {
|
||||
clusterService.submitStateUpdateTask(
|
||||
"put-datafeed-" + request.getDatafeed().getId(),
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
|
||||
@Override
|
||||
|
@ -210,11 +277,13 @@ public class PutDatafeedAction extends Action<PutDatafeedAction.Request, PutData
|
|||
if (acknowledged) {
|
||||
logger.info("Created datafeed [{}]", request.getDatafeed().getId());
|
||||
}
|
||||
return new Response(acknowledged, request.getDatafeed());
|
||||
return new Response(acknowledged,
|
||||
request.getDatafeed());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterState execute(ClusterState currentState)
|
||||
throws Exception {
|
||||
return putDatafeed(request, currentState);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.integration;
|
|||
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
|
@ -31,12 +32,15 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
|
||||
public class DatafeedJobIT extends ESRestTestCase {
|
||||
|
||||
|
||||
private static final String BASIC_AUTH_VALUE = basicAuthHeaderValue("elastic", new SecuredString("changeme".toCharArray()));
|
||||
private static final String BASIC_AUTH_VALUE_ELASTIC =
|
||||
basicAuthHeaderValue("elastic", new SecuredString("changeme".toCharArray()));
|
||||
private static final String BASIC_AUTH_VALUE_ML_ADMIN =
|
||||
basicAuthHeaderValue("ml_admin", new SecuredString("changeme".toCharArray()));
|
||||
|
||||
@Override
|
||||
protected Settings restClientSettings() {
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE).build();
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization",
|
||||
BASIC_AUTH_VALUE_ELASTIC).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,6 +51,16 @@ public class DatafeedJobIT extends ESRestTestCase {
|
|||
@Before
|
||||
public void setUpData() throws Exception {
|
||||
|
||||
// This user has admin rights on machine learning, but (importantly for the tests) no
|
||||
// rights on any of the data indexes
|
||||
String user = "{"
|
||||
+ " \"password\" : \"changeme\","
|
||||
+ " \"roles\" : [ \"machine_learning_admin\" ]"
|
||||
+ "}";
|
||||
|
||||
client().performRequest("put", "_xpack/security/user/ml_admin", Collections.emptyMap(),
|
||||
new StringEntity(user, ContentType.APPLICATION_JSON));
|
||||
|
||||
String mappings = "{"
|
||||
+ " \"mappings\": {"
|
||||
+ " \"response\": {"
|
||||
|
@ -229,6 +243,57 @@ public class DatafeedJobIT extends ESRestTestCase {
|
|||
.execute();
|
||||
}
|
||||
|
||||
public void testInsufficientSearchPrivilegesOnPut() throws Exception {
|
||||
String jobId = "privs-put-job";
|
||||
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
|
||||
+ "\"summary_count_field_name\":\"doc_count\","
|
||||
+ "\"detectors\":[{\"function\":\"mean\","
|
||||
+ "\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
|
||||
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
|
||||
+ "}";
|
||||
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
|
||||
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
|
||||
|
||||
String datafeedId = "datafeed-" + jobId;
|
||||
// This should be disallowed, because even though the ml_admin user has permission to
|
||||
// create a datafeed they DON'T have permission to search the index the datafeed is
|
||||
// configured to read
|
||||
ResponseException e = expectThrows(ResponseException.class, () ->
|
||||
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response")
|
||||
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN)
|
||||
.build());
|
||||
|
||||
assertThat(e.getMessage(), containsString("Cannot create datafeed"));
|
||||
assertThat(e.getMessage(),
|
||||
containsString("user ml_admin lacks permissions on the indexes to be searched"));
|
||||
}
|
||||
|
||||
public void testInsufficientSearchPrivilegesOnPreview() throws Exception {
|
||||
String jobId = "privs-preview-job";
|
||||
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
|
||||
+ "\"summary_count_field_name\":\"doc_count\","
|
||||
+ "\"detectors\":[{\"function\":\"mean\","
|
||||
+ "\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
|
||||
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
|
||||
+ "}";
|
||||
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
|
||||
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
|
||||
|
||||
String datafeedId = "datafeed-" + jobId;
|
||||
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").build();
|
||||
|
||||
// This should be disallowed, because ml_admin is trying to preview a datafeed created by
|
||||
// by another user (elastic in this case) that will reveal the content of an index they
|
||||
// don't have permission to search directly
|
||||
ResponseException e = expectThrows(ResponseException.class, () ->
|
||||
client().performRequest("get",
|
||||
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_preview",
|
||||
new BasicHeader("Authorization", BASIC_AUTH_VALUE_ML_ADMIN)));
|
||||
|
||||
assertThat(e.getMessage(),
|
||||
containsString("[indices:data/read/search] is unauthorized for user [ml_admin]"));
|
||||
}
|
||||
|
||||
public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
|
||||
String jobId = "aggs-histogram-job";
|
||||
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
|
||||
|
@ -474,6 +539,7 @@ public class DatafeedJobIT extends ESRestTestCase {
|
|||
boolean source;
|
||||
String scriptedFields;
|
||||
String aggregations;
|
||||
String authHeader = BASIC_AUTH_VALUE_ELASTIC;
|
||||
|
||||
DatafeedBuilder(String datafeedId, String jobId, String index, String type) {
|
||||
this.datafeedId = datafeedId;
|
||||
|
@ -497,6 +563,11 @@ public class DatafeedJobIT extends ESRestTestCase {
|
|||
return this;
|
||||
}
|
||||
|
||||
DatafeedBuilder setAuthHeader(String authHeader) {
|
||||
this.authHeader = authHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
Response build() throws IOException {
|
||||
String datafeedConfig = "{"
|
||||
+ "\"job_id\": \"" + jobId + "\",\"indexes\":[\"" + index + "\"],\"types\":[\"" + type + "\"]"
|
||||
|
@ -505,7 +576,8 @@ public class DatafeedJobIT extends ESRestTestCase {
|
|||
+ (aggregations == null ? "" : ",\"aggs\":" + aggregations)
|
||||
+ "}";
|
||||
return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(),
|
||||
new StringEntity(datafeedConfig, ContentType.APPLICATION_JSON));
|
||||
new StringEntity(datafeedConfig, ContentType.APPLICATION_JSON),
|
||||
new BasicHeader("Authorization", authHeader));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue