Preserve parent task id for ml transform (#55124)

This change ensures that internal client requests spawned by the
transform persistent task executor and that use the end user security
credentials, have the parent task id assigned. The objective here is
to permit auditing (as well as tracking for debugging purposes) of all
the end-user requests executed on its behalf by persistent tasks.
Because transform tasks already implements graceful shutdown of the
child tasks, this change does not interfere with that by opting out of
the persistent task cancellation of child tasks.

Relates #55046 #54943 #52314
Closes #54957
This commit is contained in:
Albert Zaharovits 2020-04-14 18:43:47 +03:00 committed by GitHub
parent 12130843ca
commit 7f35b927d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 50 additions and 27 deletions

View File

@ -294,7 +294,6 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
TransformCheckpointService checkpointService = new TransformCheckpointService(
client,
settings,
clusterService,
configManager,

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -54,15 +55,17 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
private final TransformConfigManager transformConfigManager;
private final TransformCheckpointService transformCheckpointService;
private final Client client;
@Inject
public TransportGetTransformStatsAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
TransformServices transformServices
TransformServices transformServices,
Client client
) {
this(GetTransformStatsAction.NAME, transportService, actionFilters, clusterService, transformServices);
this(GetTransformStatsAction.NAME, transportService, actionFilters, clusterService, transformServices, client);
}
protected TransportGetTransformStatsAction(
@ -70,11 +73,13 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
TransformServices transformServices
TransformServices transformServices,
Client client
) {
super(name, clusterService, transportService, actionFilters, Request::new, Response::new, Response::new, ThreadPool.Names.SAME);
this.transformConfigManager = transformServices.getConfigManager();
this.transformCheckpointService = transformServices.getCheckpointService();
this.client = client;
}
@Override
@ -248,6 +253,7 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
private void populateSingleStoppedTransformStat(TransformStoredDoc transform, ActionListener<TransformCheckpointingInfo> listener) {
transformCheckpointService.getCheckpointingInfo(
client,
transform.getId(),
transform.getTransformState().getCheckpoint(),
transform.getTransformState().getPosition(),

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.transform.action.compat;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.transport.TransportService;
@ -21,8 +22,9 @@ public class TransportGetTransformStatsActionDeprecated extends TransportGetTran
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
TransformServices transformServices
TransformServices transformServices,
Client client
) {
super(GetTransformStatsActionDeprecated.NAME, transportService, actionFilters, clusterService, transformServices);
super(GetTransformStatsActionDeprecated.NAME, transportService, actionFilters, clusterService, transformServices, client);
}
}

View File

@ -32,25 +32,22 @@ public class TransformCheckpointService {
private static final Logger logger = LogManager.getLogger(TransformCheckpointService.class);
private final Client client;
private final TransformConfigManager transformConfigManager;
private final TransformAuditor transformAuditor;
private final RemoteClusterResolver remoteClusterResolver;
public TransformCheckpointService(
final Client client,
final Settings settings,
final ClusterService clusterService,
final TransformConfigManager transformConfigManager,
TransformAuditor transformAuditor
) {
this.client = client;
this.transformConfigManager = transformConfigManager;
this.transformAuditor = transformAuditor;
this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings());
}
public CheckpointProvider getCheckpointProvider(final TransformConfig transformConfig) {
public CheckpointProvider getCheckpointProvider(final Client client, final TransformConfig transformConfig) {
if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) {
return new TimeBasedCheckpointProvider(
client,
@ -74,6 +71,7 @@ public class TransformCheckpointService {
* @param listener listener to retrieve the result
*/
public void getCheckpointingInfo(
final Client client,
final String transformId,
final long lastCheckpointNumber,
final TransformIndexerPosition nextCheckpointPosition,
@ -83,7 +81,7 @@ public class TransformCheckpointService {
// we need to retrieve the config first before we can defer the rest to the corresponding provider
transformConfigManager.getTransformConfiguration(transformId, ActionListener.wrap(transformConfig -> {
getCheckpointProvider(transformConfig).getCheckpointingInfo(
getCheckpointProvider(client, transformConfig).getCheckpointingInfo(
lastCheckpointNumber,
nextCheckpointPosition,
nextCheckpointProgress,

View File

@ -6,7 +6,7 @@
package org.elasticsearch.xpack.transform.transforms;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@ -24,7 +24,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
class ClientTransformIndexerBuilder {
private Client client;
private ParentTaskAssigningClient parentTaskClient;
private TransformConfigManager transformsConfigManager;
private TransformCheckpointService transformsCheckpointService;
private TransformAuditor auditor;
@ -44,16 +44,16 @@ class ClientTransformIndexerBuilder {
}
ClientTransformIndexer build(Executor executor, TransformContext context) {
CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(transformConfig);
CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig);
return new ClientTransformIndexer(
executor,
transformsConfigManager,
checkpointProvider,
new TransformProgressGatherer(client),
new TransformProgressGatherer(parentTaskClient),
new AtomicReference<>(this.indexerState),
initialPosition,
client,
parentTaskClient,
auditor,
initialStats,
transformConfig,
@ -72,8 +72,8 @@ class ClientTransformIndexerBuilder {
return this;
}
ClientTransformIndexerBuilder setClient(Client client) {
this.client = client;
ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClient) {
this.parentTaskClient = parentTaskClient;
return this;
}

View File

@ -226,9 +226,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
//
// We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again
// later if they want.
final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setAuditor(auditor)
.setClient(client)
.setClient(buildTask.getParentTaskClient())
.setTransformsCheckpointService(transformServices.getCheckpointService())
.setTransformsConfigManager(transformServices.getConfigManager());
@ -346,7 +345,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
ActionListener<TransformConfig> getTransformConfigListener = ActionListener.wrap(config -> {
if (config.isValid()) {
indexerBuilder.setTransformConfig(config);
SchemaUtil.getDestinationFieldMappings(client, config.getDestination().getIndex(), getFieldMappingsListener);
SchemaUtil.getDestinationFieldMappings(buildTask.getParentTaskClient(), config.getDestination().getIndex(),
getFieldMappingsListener);
} else {
markAsFailed(buildTask, TransformMessages.getMessage(TransformMessages.TRANSFORM_CONFIGURATION_INVALID, transformId));
}
@ -368,7 +368,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
);
// <1> Check the index templates are installed
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, buildTask.getParentTaskClient(),
templateCheckListener);
}
private static IndexerState currentIndexerState(TransformState previousState) {
@ -440,6 +441,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
type,
action,
parentTaskId,
client,
persistentTask.getParams(),
(TransformState) persistentTask.getState(),
transformServices.getSchedulerEngine(),

View File

@ -13,6 +13,8 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@ -51,6 +53,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
private static final IndexerState[] RUNNING_STATES = new IndexerState[] { IndexerState.STARTED, IndexerState.INDEXING };
public static final String SCHEDULE_NAME = TransformField.TASK_NAME + "/schedule";
private final ParentTaskAssigningClient parentTaskClient;
private final TransformTaskParams transform;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
@ -65,6 +68,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
String type,
String action,
TaskId parentTask,
Client client,
TransformTaskParams transform,
TransformState state,
SchedulerEngine schedulerEngine,
@ -73,6 +77,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
Map<String, String> headers
) {
super(id, type, action, TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
this.parentTaskClient = new ParentTaskAssigningClient(client, parentTask);
this.transform = transform;
this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool;
@ -106,6 +111,10 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
this.context = new TransformContext(initialTaskState, initialReason, initialCheckpoint, this);
}
public ParentTaskAssigningClient getParentTaskClient() {
return parentTaskClient;
}
public String getTransformId() {
return transform.getId();
}
@ -170,6 +179,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
ClientTransformIndexer indexer = getIndexer();
if (indexer == null) {
transformsCheckpointService.getCheckpointingInfo(
parentTaskClient,
transform.getId(),
context.getCheckpoint(),
initialPosition,
@ -410,6 +420,12 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
}
}
@Override
public boolean shouldCancelChildrenOnCancellation() {
// shutdown implements graceful shutdown of children
return false;
}
/**
* Attempt to gracefully cleanup the transform so it can be terminated.
* This tries to remove the job from the scheduler and completes the persistent task

View File

@ -142,7 +142,6 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
// use a mock for the checkpoint service
TransformAuditor mockAuditor = mock(TransformAuditor.class);
transformCheckpointService = new TransformCheckpointService(
mockClientForCheckpointing,
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
transformsConfigManager,
@ -370,6 +369,7 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
listener::onFailure
);
transformCheckpointService.getCheckpointingInfo(
mockClientForCheckpointing,
transformId,
lastCheckpointNumber,
nextCheckpointPosition,

View File

@ -228,7 +228,6 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry());
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
client,
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
transformsConfigManager,
@ -487,7 +486,6 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
TransformAuditor mockAuditor = mock(TransformAuditor.class);
IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry());
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
client,
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
transformsConfigManager,

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -73,7 +74,6 @@ public class TransformTaskTests extends ESTestCase {
TransformAuditor auditor = new MockTransformAuditor();
TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager();
TransformCheckpointService transformsCheckpointService = new TransformCheckpointService(
client,
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
transformsConfigManager,
@ -96,6 +96,7 @@ public class TransformTaskTests extends ESTestCase {
"some_type",
"some_action",
TaskId.EMPTY_TASK_ID,
client,
new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10), false),
transformState,
mock(SchedulerEngine.class),
@ -109,7 +110,7 @@ public class TransformTaskTests extends ESTestCase {
transformTask.init(mock(PersistentTasksService.class), taskManager, "task-id", 42);
ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder();
indexerBuilder.setClient(client)
indexerBuilder.setClient(new ParentTaskAssigningClient(client, TaskId.EMPTY_TASK_ID))
.setTransformConfig(transformConfig)
.setAuditor(auditor)
.setTransformsConfigManager(transformsConfigManager)
@ -176,6 +177,7 @@ public class TransformTaskTests extends ESTestCase {
"some_type",
"some_action",
TaskId.EMPTY_TASK_ID,
client,
new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10), false),
transformState,
mock(SchedulerEngine.class),