[7.x] [ML] lay ground work for handling >1 result indices (#55892) (#56192)

* [ML] lay ground work for handling >1 result indices (#55892)

This commit removes all but one reference to `getInitialResultsIndexName`. 
This is to support more than one result index for a single job.
This commit is contained in:
Benjamin Trent 2020-05-05 15:54:08 -04:00 committed by GitHub
parent 793f265451
commit e1c5ca421e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 293 additions and 84 deletions

View File

@ -689,21 +689,17 @@ public class MlJobIT extends ESRestTestCase {
refreshAllIndices();
// check that the indices still exist but are empty
// check that the default shared index still exists but is empty
String indicesAfterDelete = EntityUtils.toString(client().performRequest(
new Request("GET", "/_cat/indices/" + AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "*")).getEntity());
assertThat(indicesAfterDelete, containsString(indexName));
assertThat(indicesAfterDelete, containsString(indexName + "-001"));
assertThat(indicesAfterDelete, containsString(indexName + "-002"));
// other results indices should be deleted as this test job ID is the only job in those indices
assertThat(indicesAfterDelete, not(containsString(indexName + "-001")));
assertThat(indicesAfterDelete, not(containsString(indexName + "-002")));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "/_count")).getEntity()),
containsString("\"count\":0"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-001/_count")).getEntity()),
containsString("\"count\":0"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-002/_count")).getEntity()),
containsString("\"count\":0"));
expectThrows(ResponseException.class, () ->
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
}

View File

@ -16,7 +16,9 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.MultiSearchAction;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
@ -316,15 +318,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
response -> {
if (response && indexNames.get().length > 0) {
logger.info("Running DBQ on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]");
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get());
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get()));
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get())
.setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setAbortOnVersionConflict(false)
.setRefresh(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
} else { // We did not execute DBQ, no need to delete aliases or check the response
@ -333,72 +335,97 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
},
failureHandler);
// Step 6. If we have any hits, that means we are NOT the only job on these indices, and should not delete the indices.
// If we do not have any hits, we can drop the indices and then skip the DBQ and alias deletion.
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
searchResponse -> {
if (searchResponse == null || searchResponse.getHits().getTotalHits().value > 0) {
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
} else {
logger.info("Running DELETE Index on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]");
DeleteIndexRequest request = new DeleteIndexRequest(indexNames.get());
request.indicesOptions(IndicesOptions.lenientExpandOpen());
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
executeAsyncWithOrigin(
parentTaskClient.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias
failureHandler),
parentTaskClient.admin().indices()::delete);
}
},
failure -> {
if (ExceptionsHelper.unwrapCause(failure) instanceof IndexNotFoundException) { // assume the index is already deleted
deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
} else {
failureHandler.accept(failure);
}
}
// Step 6. Handle each multi-search response. There should be one response for each underlying index.
// For each underlying index that contains results ONLY for the current job, we will delete that index.
// If there exists at least 1 index that has another job's results, we will run DBQ.
ActionListener<MultiSearchResponse> customIndexSearchHandler = ActionListener.wrap(
multiSearchResponse -> {
if (multiSearchResponse == null) {
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
return;
}
String defaultSharedIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
List<String> indicesToDelete = new ArrayList<>();
boolean needToRunDBQTemp = false;
assert multiSearchResponse.getResponses().length == indexNames.get().length;
int i = 0;
for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
if (item.isFailure()) {
++i;
if (ExceptionsHelper.unwrapCause(item.getFailure()) instanceof IndexNotFoundException) {
// index is already deleted, no need to take action against it
continue;
} else {
failureHandler.accept(item.getFailure());
return;
}
}
SearchResponse searchResponse = item.getResponse();
if (searchResponse.getHits().getTotalHits().value > 0 || indexNames.get()[i].equals(defaultSharedIndex)) {
needToRunDBQTemp = true;
} else {
indicesToDelete.add(indexNames.get()[i]);
}
++i;
}
final boolean needToRunDBQ = needToRunDBQTemp;
if (indicesToDelete.isEmpty()) {
deleteByQueryExecutor.onResponse(needToRunDBQ);
return;
}
logger.info("[{}] deleting the following indices directly {}", jobId, indicesToDelete);
DeleteIndexRequest request = new DeleteIndexRequest(indicesToDelete.toArray(new String[0]));
request.indicesOptions(IndicesOptions.lenientExpandOpenHidden());
executeAsyncWithOrigin(
parentTaskClient.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
response -> deleteByQueryExecutor.onResponse(needToRunDBQ), // only run DBQ if there is a shared index
failureHandler),
parentTaskClient.admin().indices()::delete);
},
failure -> {
if (ExceptionsHelper.unwrapCause(failure) instanceof IndexNotFoundException) { // assume the index is already deleted
deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
} else {
failureHandler.accept(failure);
}
}
);
// Step 5. Determine if we are on shared indices by looking at whether the initial index was ".ml-anomalies-shared"
// or whether the indices that the job's results alias points to contain any documents from other jobs.
// TODO: this check is currently assuming that a job's results indices are either ALL shared or ALL
// dedicated to the job. We have considered functionality like rolling jobs that generate large
// volumes of results from shared to dedicated indices. On deletion such a job would have a mix of
// shared indices requiring DBQ and dedicated indices that could be simply dropped. The current
// functionality would apply DBQ to all these indices, which is safe but suboptimal. So this functionality
// should be revisited when we add rolling results index functionality, especially if we add the ability
// to switch a job over to a dedicated index for future results.
// Step 5. If we successfully find a job, gather information about its result indices.
// This will execute a multi-search action for every concrete index behind the job results alias.
// If there are no concrete indices, take no action and go to the next step.
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
builder -> {
Job job = builder.build();
indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)));
// The job may no longer be using the initial shared index, but if it started off on a
// shared index then it will still be on a shared index even if it's been reindexed
if (job.getInitialResultsIndexName()
.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
// don't bother searching the index any further, we are on the default shared
customIndexSearchHandler.onResponse(null);
} else if (indexNames.get().length == 0) {
// don't bother searching the index any further - it's already been closed or deleted
customIndexSearchHandler.onResponse(null);
} else {
SearchSourceBuilder source = new SearchSourceBuilder()
.size(1)
.trackTotalHits(true)
.query(QueryBuilders.boolQuery().filter(
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
SearchRequest searchRequest = new SearchRequest(indexNames.get());
searchRequest.source(source);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
}
},
failureHandler
builder -> {
indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)));
if (indexNames.get().length == 0) {
// don't bother searching the index any further - it's already been closed or deleted
customIndexSearchHandler.onResponse(null);
return;
}
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
// It is important that the requests are in the same order as the index names.
// This is because responses are ordered according to their requests.
for (String indexName : indexNames.get()) {
SearchSourceBuilder source = new SearchSourceBuilder()
.size(0)
// if we have just one hit we cannot delete the index
.trackTotalHitsUpTo(1)
.query(QueryBuilders.boolQuery().filter(
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
multiSearchRequest.add(new SearchRequest(indexName).source(source));
}
executeAsyncWithOrigin(parentTaskClient,
ML_ORIGIN,
MultiSearchAction.INSTANCE,
multiSearchRequest,
customIndexSearchHandler);
},
failureHandler
);
// Step 4. Get the job as the initial result index name is required

View File

@ -173,16 +173,18 @@ public class JobResultsProvider {
SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1),
CategorizerState.v54DocumentId(job.getId(), 1)))
.setTrackTotalHits(false)
.setIndicesOptions(IndicesOptions.strictExpand());
SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId())))
.setTrackTotalHits(false)
.setIndicesOptions(IndicesOptions.strictExpand());
String resultsIndexName = job.getInitialResultsIndexName();
SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
SearchRequestBuilder resultDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")
.setIndicesOptions(IndicesOptions.lenientExpandHidden())
.setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
.setTrackTotalHits(false)
.setSize(1);
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()

View File

@ -5,22 +5,82 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import org.junit.Before;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
/**
* Test that ML does not touch unnecessary indices when removing job index aliases
*/
public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
private static long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN.getMillis();
private static final String UNRELATED_INDEX = "unrelated-data";
private JobResultsProvider jobResultsProvider;
private JobResultsPersister jobResultsPersister;
@Before
public void createComponents() {
Settings settings = nodeSettings(0);
ThreadPool tp = mock(ThreadPool.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
ClusterService.USER_DEFINED_METADATA,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver());
jobResultsPersister = new JobResultsPersister(
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node"));
}
public void testUnrelatedIndexNotTouched() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(1);
ensureStableCluster(1);
@ -46,4 +106,106 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
disableIndexBlock(UNRELATED_INDEX, IndexMetadata.SETTING_READ_ONLY);
}
public void testDeleteDedicatedJobWithDataInShared() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(1);
ensureStableCluster(1);
String jobIdDedicated = "delete-test-job-dedicated";
Job.Builder job = createJob(jobIdDedicated, new ByteSizeValue(2, ByteSizeUnit.MB)).setResultsIndexName(jobIdDedicated);
client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet();
String dedicatedIndex = job.build().getInitialResultsIndexName();
awaitJobOpenedAndAssigned(job.getId(), null);
createBuckets(jobIdDedicated, 1, 10);
String jobIdShared = "delete-test-job-shared";
job = createJob(jobIdShared, new ByteSizeValue(2, ByteSizeUnit.MB));
client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet();
awaitJobOpenedAndAssigned(job.getId(), null);
createBuckets(jobIdShared, 1, 10);
// Manually switching over alias info
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions
.add()
.alias(AnomalyDetectorsIndex.jobResultsAliasedName(jobIdDedicated))
.isHidden(true)
.index(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared")
.writeIndex(false)
.filter(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobIdDedicated))))
.addAliasAction(IndicesAliasesRequest.AliasActions
.add()
.alias(AnomalyDetectorsIndex.resultsWriteAlias(jobIdDedicated))
.index(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared")
.isHidden(true)
.writeIndex(true))
.addAliasAction(IndicesAliasesRequest.AliasActions
.remove()
.alias(AnomalyDetectorsIndex.resultsWriteAlias(jobIdDedicated))
.index(dedicatedIndex));
client().admin().indices().aliases(aliasesRequest).actionGet();
createBuckets(jobIdDedicated, 11, 10);
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*").get();
AtomicReference<QueryPage<Bucket>> bucketHandler = new AtomicReference<>();
AtomicReference<Exception> failureHandler = new AtomicReference<>();
blockingCall(listener -> jobResultsProvider.buckets(jobIdDedicated,
new BucketsQueryBuilder().from(0).size(22),
listener::onResponse,
listener::onFailure,
client()), bucketHandler, failureHandler);
assertThat(failureHandler.get(), is(nullValue()));
assertThat(bucketHandler.get().count(), equalTo(22L));
DeleteJobAction.Request deleteJobRequest = new DeleteJobAction.Request(jobIdDedicated);
deleteJobRequest.setForce(true);
client().execute(DeleteJobAction.INSTANCE, deleteJobRequest).get();
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*").get();
// Make sure our shared index job is OK
bucketHandler = new AtomicReference<>();
failureHandler = new AtomicReference<>();
blockingCall(listener -> jobResultsProvider.buckets(jobIdShared,
new BucketsQueryBuilder().from(0).size(21),
listener::onResponse,
listener::onFailure,
client()), bucketHandler, failureHandler);
assertThat(failureHandler.get(), is(nullValue()));
assertThat(bucketHandler.get().count(), equalTo(11L));
// Make sure dedicated index is gone
assertThat(client().admin()
.indices()
.prepareGetIndex()
.setIndices(dedicatedIndex)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.get()
.indices().length, equalTo(0));
// Make sure all results referencing the dedicated job are gone
assertThat(client().prepareSearch()
.setIndices(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpenHidden())
.setTrackTotalHits(true)
.setSize(0)
.setSource(SearchSourceBuilder.searchSource()
.query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobIdDedicated))))
.get()
.getHits()
.getTotalHits()
.value, equalTo(0L));
}
private void createBuckets(String jobId, int from, int count) {
JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true);
for (int i = from; i <= count + from; ++i) {
Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan);
builder.persistBucket(bucket);
}
builder.executeRequest();
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.support;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -66,8 +67,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
@ -391,6 +394,25 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
}
}
protected static <T> void blockingCall(Consumer<ActionListener<T>> function,
AtomicReference<T> response,
AtomicReference<Exception> error) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
response.set(r);
latch.countDown();
},
e -> {
error.set(e);
latch.countDown();
}
);
function.accept(listener);
latch.await();
}
protected String awaitJobOpenedAndAssigned(String jobId, String queryNode) throws Exception {
PersistentTasksClusterService persistentTasksClusterService =