* [ML] prefer secondary authorization header for data[feed|frame] authz (#54121) Secondary authorization headers are to be used to facilitate Kibana spaces support + ML jobs/datafeeds. Now on PUT/Update/Preview datafeed, and PUT data frame analytics the secondary authorization is preferred over the primary (if provided). closes https://github.com/elastic/elasticsearch/issues/53801 * fixing for backport
This commit is contained in:
parent
20d67720aa
commit
65233383f6
|
@ -36,6 +36,12 @@ to create or update it. If the two sets of roles differ then the preview may
|
|||
not accurately reflect what the {dfeed} will return when started. To avoid
|
||||
such problems, the same user that creates/updates the {dfeed} should preview
|
||||
it to ensure it is returning the expected data.
|
||||
+
|
||||
--
|
||||
NOTE: It is possible that secondary authorization headers are supplied in the
|
||||
request. If this is the case, the secondary authorization headers are used
|
||||
instead of the primary headers.
|
||||
--
|
||||
|
||||
[[ml-preview-datafeed-path-parms]]
|
||||
==== {api-path-parms-title}
|
||||
|
|
|
@ -42,6 +42,9 @@ each interval. See {ml-docs}/ml-delayed-data-detection.html[Handling delayed dat
|
|||
* When {es} {security-features} are enabled, your {dfeed} remembers which roles
|
||||
the user who created it had at the time of creation and runs the query using
|
||||
those same roles.
|
||||
* It is possible that secondary authorization headers are supplied in the
|
||||
request. If this is the case, the secondary authorization headers are used
|
||||
instead of the primary headers.
|
||||
====
|
||||
|
||||
[[ml-put-datafeed-path-parms]]
|
||||
|
|
|
@ -35,6 +35,12 @@ IMPORTANT: When {es} {security-features} are enabled, your {dfeed} remembers
|
|||
which roles the user who updated it had at the time of update and runs the query
|
||||
using those same roles.
|
||||
|
||||
+
|
||||
--
|
||||
NOTE: It is possible that secondary authorization headers are supplied in the
|
||||
request. If this is the case, the secondary authorization headers are used
|
||||
instead of the primary headers.
|
||||
--
|
||||
[[ml-update-datafeed-path-parms]]
|
||||
==== {api-path-parms-title}
|
||||
|
||||
|
|
|
@ -32,6 +32,12 @@ If the {es} {security-features} are enabled, you must have the following built-i
|
|||
|
||||
For more information, see <<security-privileges>> and <<built-in-roles>>.
|
||||
|
||||
+
|
||||
--
|
||||
NOTE: It is possible that secondary authorization headers are supplied in the
|
||||
request. If this is the case, the secondary authorization headers are used
|
||||
instead of the primary headers.
|
||||
--
|
||||
|
||||
[[ml-put-dfanalytics-desc]]
|
||||
==== {api-description-title}
|
||||
|
|
|
@ -544,6 +544,46 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
|
|||
containsString("[indices:data/read/field_caps] is unauthorized for user [ml_admin]"));
|
||||
}
|
||||
|
||||
public void testSecondaryAuthSearchPrivilegesLookBack() throws Exception {
|
||||
setupDataAccessRole("airline-data");
|
||||
String jobId = "secondary-privs-put-job";
|
||||
createJob(jobId, "airline.keyword");
|
||||
String datafeedId = "datafeed-" + jobId;
|
||||
// Primary auth header does not have access, but secondary auth does
|
||||
new DatafeedBuilder(datafeedId, jobId, "airline-data")
|
||||
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN)
|
||||
.setSecondaryAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS)
|
||||
.build();
|
||||
openJob(client(), jobId);
|
||||
|
||||
startDatafeedAndWaitUntilStopped(datafeedId);
|
||||
waitUntilJobIsClosed(jobId);
|
||||
|
||||
Response jobStatsResponse = client().performRequest(new Request("GET",
|
||||
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
|
||||
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
|
||||
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
|
||||
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
|
||||
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
|
||||
}
|
||||
|
||||
public void testSecondaryAuthSearchPrivilegesOnPreview() throws Exception {
|
||||
setupDataAccessRole("airline-data");
|
||||
String jobId = "secondary-privs-preview-job";
|
||||
createJob(jobId, "airline.keyword");
|
||||
|
||||
String datafeedId = "datafeed-" + jobId;
|
||||
new DatafeedBuilder(datafeedId, jobId, "airline-data").build();
|
||||
|
||||
Request getFeed = new Request("GET", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_preview");
|
||||
RequestOptions.Builder options = getFeed.getOptions().toBuilder();
|
||||
options.addHeader("Authorization", BASIC_AUTH_VALUE_ML_ADMIN);
|
||||
options.addHeader("es-secondary-authorization", BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS);
|
||||
getFeed.setOptions(options);
|
||||
// Should not fail as secondary auth has permissions.
|
||||
client().performRequest(getFeed);
|
||||
}
|
||||
|
||||
public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
|
||||
String jobId = "aggs-histogram-job";
|
||||
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
|
||||
|
@ -1181,6 +1221,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
|
|||
String scriptedFields;
|
||||
String aggregations;
|
||||
String authHeader = BASIC_AUTH_VALUE_SUPER_USER;
|
||||
String secondaryAuthHeader = null;
|
||||
String chunkingTimespan;
|
||||
String indicesOptions;
|
||||
|
||||
|
@ -1210,6 +1251,11 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
|
|||
return this;
|
||||
}
|
||||
|
||||
DatafeedBuilder setSecondaryAuthHeader(String authHeader) {
|
||||
this.secondaryAuthHeader = authHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
DatafeedBuilder setChunkingTimespan(String timespan) {
|
||||
chunkingTimespan = timespan;
|
||||
return this;
|
||||
|
@ -1233,6 +1279,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
|
|||
+ "}");
|
||||
RequestOptions.Builder options = request.getOptions().toBuilder();
|
||||
options.addHeader("Authorization", authHeader);
|
||||
if (this.secondaryAuthHeader != null) {
|
||||
options.addHeader("es-secondary-authorization", secondaryAuthHeader);
|
||||
}
|
||||
request.setOptions(options);
|
||||
return client().performRequest(request);
|
||||
}
|
||||
|
|
|
@ -11,15 +11,18 @@ import org.elasticsearch.action.support.HandledTransportAction;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.core.security.SecurityContext;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
|
||||
|
@ -34,6 +37,8 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
|
||||
|
||||
public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
@ -42,9 +47,10 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|||
private final DatafeedConfigProvider datafeedConfigProvider;
|
||||
private final JobResultsProvider jobResultsProvider;
|
||||
private final NamedXContentRegistry xContentRegistry;
|
||||
private final SecurityContext securityContext;
|
||||
|
||||
@Inject
|
||||
public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
|
||||
public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
|
||||
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
|
||||
NamedXContentRegistry xContentRegistry) {
|
||||
|
@ -55,6 +61,8 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|||
this.datafeedConfigProvider = datafeedConfigProvider;
|
||||
this.jobResultsProvider = jobResultsProvider;
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
|
||||
new SecurityContext(settings, threadPool.getThreadContext()) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -65,37 +73,39 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|||
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
|
||||
jobBuilder -> {
|
||||
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
|
||||
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
previewDatafeed.setHeaders(headers);
|
||||
jobResultsProvider.datafeedTimingStats(
|
||||
jobBuilder.getId(),
|
||||
timingStats -> {
|
||||
// NB: this is using the client from the transport layer, NOT the internal client.
|
||||
// This is important because it means the datafeed search will fail if the user
|
||||
// requesting the preview doesn't have permission to search the relevant indices.
|
||||
DataExtractorFactory.create(
|
||||
client,
|
||||
previewDatafeed.build(),
|
||||
jobBuilder.build(),
|
||||
xContentRegistry,
|
||||
// Fake DatafeedTimingStatsReporter that does not have access to results index
|
||||
new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
|
||||
new ActionListener<DataExtractorFactory>() {
|
||||
@Override
|
||||
public void onResponse(DataExtractorFactory dataExtractorFactory) {
|
||||
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
|
||||
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
|
||||
}
|
||||
useSecondaryAuthIfAvailable(securityContext, () -> {
|
||||
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
previewDatafeed.setHeaders(headers);
|
||||
jobResultsProvider.datafeedTimingStats(
|
||||
jobBuilder.getId(),
|
||||
timingStats -> {
|
||||
// NB: this is using the client from the transport layer, NOT the internal client.
|
||||
// This is important because it means the datafeed search will fail if the user
|
||||
// requesting the preview doesn't have permission to search the relevant indices.
|
||||
DataExtractorFactory.create(
|
||||
client,
|
||||
previewDatafeed.build(),
|
||||
jobBuilder.build(),
|
||||
xContentRegistry,
|
||||
// Fake DatafeedTimingStatsReporter that does not have access to results index
|
||||
new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
|
||||
new ActionListener<DataExtractorFactory>() {
|
||||
@Override
|
||||
public void onResponse(DataExtractorFactory dataExtractorFactory) {
|
||||
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
|
||||
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
},
|
||||
listener::onFailure);
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
},
|
||||
listener::onFailure);
|
||||
});
|
||||
},
|
||||
listener::onFailure));
|
||||
},
|
||||
|
|
|
@ -57,6 +57,8 @@ import java.time.Instant;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
|
||||
|
||||
public class TransportPutDataFrameAnalyticsAction
|
||||
extends TransportMasterNodeAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
|
||||
|
||||
|
@ -140,27 +142,29 @@ public class TransportPutDataFrameAnalyticsAction
|
|||
.build();
|
||||
|
||||
if (licenseState.isAuthAllowed()) {
|
||||
final String username = securityContext.getUser().principal();
|
||||
RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(preparedForPutConfig.getSource().getIndex())
|
||||
.privileges("read")
|
||||
.build();
|
||||
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(preparedForPutConfig.getDest().getIndex())
|
||||
.privileges("read", "index", "create_index")
|
||||
.build();
|
||||
useSecondaryAuthIfAvailable(securityContext, () -> {
|
||||
final String username = securityContext.getUser().principal();
|
||||
RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(preparedForPutConfig.getSource().getIndex())
|
||||
.privileges("read")
|
||||
.build();
|
||||
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(preparedForPutConfig.getDest().getIndex())
|
||||
.privileges("read", "index", "create_index")
|
||||
.build();
|
||||
|
||||
HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
|
||||
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
|
||||
privRequest.username(username);
|
||||
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
|
||||
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
|
||||
HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
|
||||
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
|
||||
privRequest.username(username);
|
||||
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
|
||||
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
|
||||
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, preparedForPutConfig, r, listener),
|
||||
listener::onFailure);
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, preparedForPutConfig, r, listener),
|
||||
listener::onFailure);
|
||||
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
});
|
||||
} else {
|
||||
updateDocMappingAndPutConfig(
|
||||
preparedForPutConfig,
|
||||
|
|
|
@ -61,6 +61,7 @@ import java.util.Map;
|
|||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
|
||||
|
||||
public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
|
||||
|
||||
|
@ -106,52 +107,52 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
|
|||
// If security is enabled only create the datafeed if the user requesting creation has
|
||||
// permission to read the indices the datafeed is going to read from
|
||||
if (licenseState.isAuthAllowed()) {
|
||||
useSecondaryAuthIfAvailable(securityContext, () -> {
|
||||
final String[] indices = request.getDatafeed().getIndices().toArray(new String[0]);
|
||||
|
||||
final String[] indices = request.getDatafeed().getIndices().toArray(new String[0]);
|
||||
final String username = securityContext.getUser().principal();
|
||||
final HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
|
||||
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
|
||||
privRequest.username(username);
|
||||
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
|
||||
|
||||
final String username = securityContext.getUser().principal();
|
||||
final HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
|
||||
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
|
||||
privRequest.username(username);
|
||||
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
|
||||
final RoleDescriptor.IndicesPrivileges.Builder indicesPrivilegesBuilder = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(indices);
|
||||
|
||||
final RoleDescriptor.IndicesPrivileges.Builder indicesPrivilegesBuilder = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(indices);
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, request, r, state, listener),
|
||||
listener::onFailure);
|
||||
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, request, r, state, listener),
|
||||
listener::onFailure);
|
||||
|
||||
ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
|
||||
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
|
||||
} else {
|
||||
indicesPrivilegesBuilder.privileges(SearchAction.NAME, RollupSearchAction.NAME);
|
||||
}
|
||||
privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
},
|
||||
e -> {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
|
||||
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
|
||||
ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
|
||||
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
|
||||
} else {
|
||||
indicesPrivilegesBuilder.privileges(SearchAction.NAME, RollupSearchAction.NAME);
|
||||
}
|
||||
privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
},
|
||||
e -> {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
|
||||
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
|
||||
privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
if (RemoteClusterLicenseChecker.containsRemoteIndex(request.getDatafeed().getIndices())) {
|
||||
getRollupIndexCapsActionHandler.onResponse(new GetRollupIndexCapsAction.Response());
|
||||
} else {
|
||||
executeAsyncWithOrigin(client,
|
||||
ML_ORIGIN,
|
||||
GetRollupIndexCapsAction.INSTANCE,
|
||||
new GetRollupIndexCapsAction.Request(indices),
|
||||
getRollupIndexCapsActionHandler);
|
||||
}
|
||||
);
|
||||
if (RemoteClusterLicenseChecker.containsRemoteIndex(request.getDatafeed().getIndices())) {
|
||||
getRollupIndexCapsActionHandler.onResponse(new GetRollupIndexCapsAction.Response());
|
||||
} else {
|
||||
executeAsyncWithOrigin(client,
|
||||
ML_ORIGIN,
|
||||
GetRollupIndexCapsAction.INSTANCE,
|
||||
new GetRollupIndexCapsAction.Request(indices),
|
||||
getRollupIndexCapsActionHandler);
|
||||
}
|
||||
|
||||
});
|
||||
} else {
|
||||
putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener);
|
||||
}
|
||||
|
|
|
@ -23,12 +23,14 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|||
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.security.SecurityContext;
|
||||
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
|
||||
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
|
||||
|
@ -38,6 +40,8 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
|
||||
|
||||
public class TransportUpdateDatafeedAction extends
|
||||
TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {
|
||||
|
||||
|
@ -45,6 +49,7 @@ public class TransportUpdateDatafeedAction extends
|
|||
private final DatafeedConfigProvider datafeedConfigProvider;
|
||||
private final JobConfigProvider jobConfigProvider;
|
||||
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
|
||||
private final SecurityContext securityContext;
|
||||
|
||||
@Inject
|
||||
public TransportUpdateDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
|
@ -58,6 +63,8 @@ public class TransportUpdateDatafeedAction extends
|
|||
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
|
||||
this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
|
||||
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
|
||||
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
|
||||
new SecurityContext(settings, threadPool.getThreadContext()) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,15 +85,12 @@ public class TransportUpdateDatafeedAction extends
|
|||
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId()));
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
|
||||
|
||||
// Check datafeed is stopped
|
||||
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
|
||||
if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) {
|
||||
listener.onFailure(ExceptionsHelper.conflictStatusException(
|
||||
Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE,
|
||||
request.getUpdate().getId(), DatafeedState.STARTED)));
|
||||
Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE,
|
||||
request.getUpdate().getId(), DatafeedState.STARTED)));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -94,14 +98,17 @@ public class TransportUpdateDatafeedAction extends
|
|||
|
||||
CheckedConsumer<BulkByScrollResponse, Exception> updateConsumer =
|
||||
unused -> {
|
||||
datafeedConfigProvider.updateDatefeedConfig(
|
||||
datafeedId,
|
||||
request.getUpdate(),
|
||||
headers,
|
||||
jobConfigProvider::validateDatafeedJob,
|
||||
ActionListener.wrap(
|
||||
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
|
||||
listener::onFailure));
|
||||
useSecondaryAuthIfAvailable(securityContext, () -> {
|
||||
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
|
||||
datafeedConfigProvider.updateDatefeedConfig(
|
||||
request.getUpdate().getId(),
|
||||
request.getUpdate(),
|
||||
headers,
|
||||
jobConfigProvider::validateDatafeedJob,
|
||||
ActionListener.wrap(
|
||||
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
|
||||
listener::onFailure));
|
||||
});
|
||||
};
|
||||
|
||||
CheckedConsumer<Boolean, Exception> deleteTimingStatsAndUpdateConsumer =
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.elasticsearch.xpack.core.security.SecurityContext;
|
||||
import org.elasticsearch.xpack.core.security.authc.support.SecondaryAuthentication;
|
||||
|
||||
public final class SecondaryAuthorizationUtils {
|
||||
|
||||
private SecondaryAuthorizationUtils() {}
|
||||
|
||||
/**
|
||||
* This executes the supplied runnable inside the secondary auth context if it exists;
|
||||
*/
|
||||
public static void useSecondaryAuthIfAvailable(SecurityContext securityContext, Runnable runnable) {
|
||||
if (securityContext == null) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
SecondaryAuthentication secondaryAuth = securityContext.getSecondaryAuthentication();
|
||||
if (secondaryAuth != null) {
|
||||
runnable = secondaryAuth.wrap(runnable);
|
||||
}
|
||||
runnable.run();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue