[7.x][ML] MlMemoryTracker should ignore analytics tasks without config (#46789) (#46811)

It is possible for a running analytics job that its config is removed
from the '.ml-config' index (perhaps the user deleted the entire index,
etc.). In that case the task remains without a matching config. I have
raised #46781 to discuss how to deal with this issue.

This commit focuses on `MlMemoryTracker` and changes it so that when
we get the configs for the running tasks we leniently ignore missing ones.
This at least means memory tracking will keep working for other jobs
if one or more are missing.

In addition, this commit makes the cleanup code for native analytics
tests more robust by explicitly stopping all jobs and force-stopping
if an error occurs. This helps so that a single failing test does
not cause other tests fail due to pending tasks.

Backport of #46789
This commit is contained in:
Dimitris Athanasiou 2019-09-18 16:35:25 +03:00 committed by GitHub
parent 0076083b35
commit cebe8da617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 11 deletions

View File

@ -57,6 +57,8 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
} }
private void cleanUpAnalytics() { private void cleanUpAnalytics() {
stopAnalyticsAndForceStopOnError();
for (DataFrameAnalyticsConfig config : analytics) { for (DataFrameAnalyticsConfig config : analytics) {
try { try {
assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true)); assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true));
@ -67,6 +69,20 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
} }
} }
private void stopAnalyticsAndForceStopOnError() {
try {
assertThat(stopAnalytics("*").isStopped(), is(true));
} catch (Exception e) {
logger.error("Failed to stop data frame analytics jobs; trying force", e);
try {
assertThat(forceStopAnalytics("*").isStopped(), is(true));
} catch (Exception e2) {
logger.error("Force-stopping data frame analytics jobs failed", e2);
}
throw new RuntimeException("Had to resort to force-stopping jobs, something went wrong?", e);
}
}
protected void registerAnalytics(DataFrameAnalyticsConfig config) { protected void registerAnalytics(DataFrameAnalyticsConfig config) {
if (analytics.add(config) == false) { if (analytics.add(config) == false) {
throw new IllegalArgumentException("analytics config [" + config.getId() + "] is already registered"); throw new IllegalArgumentException("analytics config [" + config.getId() + "] is already registered");
@ -93,6 +109,12 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
return client().execute(StopDataFrameAnalyticsAction.INSTANCE, request).actionGet(); return client().execute(StopDataFrameAnalyticsAction.INSTANCE, request).actionGet();
} }
protected StopDataFrameAnalyticsAction.Response forceStopAnalytics(String id) {
StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id);
request.setForce(true);
return client().execute(StopDataFrameAnalyticsAction.INSTANCE, request).actionGet();
}
protected void waitUntilAnalyticsIsStopped(String id) throws Exception { protected void waitUntilAnalyticsIsStopped(String id) throws Exception {
waitUntilAnalyticsIsStopped(id, TimeValue.timeValueSeconds(30)); waitUntilAnalyticsIsStopped(id, TimeValue.timeValueSeconds(30));
} }

View File

@ -562,7 +562,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
MemoryUsageEstimationProcessManager memoryEstimationProcessManager = MemoryUsageEstimationProcessManager memoryEstimationProcessManager =
new MemoryUsageEstimationProcessManager( new MemoryUsageEstimationProcessManager(
threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client); DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry);
assert client instanceof NodeClient; assert client instanceof NodeClient;
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager( DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager(
(NodeClient) client, dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor); (NodeClient) client, dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor);

View File

@ -5,18 +5,31 @@
*/ */
package org.elasticsearch.xpack.ml.dataframe.persistence; package org.elasticsearch.xpack.ml.dataframe.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
@ -26,11 +39,15 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -38,6 +55,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class DataFrameAnalyticsConfigProvider { public class DataFrameAnalyticsConfigProvider {
private static final Logger logger = LogManager.getLogger(DataFrameAnalyticsConfigProvider.class);
private static final int MAX_CONFIGS_SIZE = 10000; private static final int MAX_CONFIGS_SIZE = 10000;
private static final Map<String, String> TO_XCONTENT_PARAMS; private static final Map<String, String> TO_XCONTENT_PARAMS;
@ -50,9 +69,11 @@ public class DataFrameAnalyticsConfigProvider {
} }
private final Client client; private final Client client;
private final NamedXContentRegistry xContentRegistry;
public DataFrameAnalyticsConfigProvider(Client client) { public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.xContentRegistry = xContentRegistry;
} }
public void put(DataFrameAnalyticsConfig config, Map<String, String> headers, ActionListener<IndexResponse> listener) { public void put(DataFrameAnalyticsConfig config, Map<String, String> headers, ActionListener<IndexResponse> listener) {
@ -119,4 +140,54 @@ public class DataFrameAnalyticsConfigProvider {
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsAction.INSTANCE, request, ActionListener.wrap( executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsAction.INSTANCE, request, ActionListener.wrap(
response -> listener.onResponse(response.getResources().results()), listener::onFailure)); response -> listener.onResponse(response.getResources().results()), listener::onFailure));
} }
/**
* Unlike {@link #getMultiple(String, boolean, ActionListener)} this method tries to get the configs that match jobs with tasks.
* It expects concrete ids and it does not throw if there is no config for a given id.
*/
public void getConfigsForJobsWithTasksLeniently(Set<String> jobsWithTask, ActionListener<List<DataFrameAnalyticsConfig>> listener) {
BoolQueryBuilder query = QueryBuilders.boolQuery();
query.filter(QueryBuilders.termQuery(DataFrameAnalyticsConfig.CONFIG_TYPE.getPreferredName(), DataFrameAnalyticsConfig.TYPE));
query.filter(QueryBuilders.termsQuery(DataFrameAnalyticsConfig.ID.getPreferredName(), jobsWithTask));
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.configIndexName());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(DataFrameAnalyticsConfigProvider.MAX_CONFIGS_SIZE);
searchRequest.source().query(query);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
searchRequest,
new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
SearchHit[] hits = searchResponse.getHits().getHits();
List<DataFrameAnalyticsConfig> configs = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
BytesReference sourceBytes = hit.getSourceRef();
try (InputStream stream = sourceBytes.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
configs.add(DataFrameAnalyticsConfig.LENIENT_PARSER.apply(parser, null).build());
} catch (IOException e) {
listener.onFailure(e);
}
}
Set<String> tasksWithoutConfigs = new HashSet<>(jobsWithTask);
tasksWithoutConfigs.removeAll(configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toList()));
if (tasksWithoutConfigs.isEmpty() == false) {
logger.warn("Data frame analytics tasks {} have no configs", tasksWithoutConfigs);
}
listener.onResponse(configs);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
client::search);
}
} }

View File

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
@ -352,10 +353,10 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
return; return;
} }
String startedJobIds = mlDataFrameAnalyticsJobTasks.stream() Set<String> jobsWithTasks = mlDataFrameAnalyticsJobTasks.stream().map(
.map(task -> ((StartDataFrameAnalyticsAction.TaskParams) task.getParams()).getId()).sorted().collect(Collectors.joining(",")); task -> ((StartDataFrameAnalyticsAction.TaskParams) task.getParams()).getId()).collect(Collectors.toSet());
configProvider.getMultiple(startedJobIds, false, ActionListener.wrap( configProvider.getConfigsForJobsWithTasksLeniently(jobsWithTasks, ActionListener.wrap(
analyticsConfigs -> { analyticsConfigs -> {
for (DataFrameAnalyticsConfig analyticsConfig : analyticsConfigs) { for (DataFrameAnalyticsConfig analyticsConfig : analyticsConfigs) {
memoryRequirementByDataFrameAnalyticsJob.put(analyticsConfig.getId(), memoryRequirementByDataFrameAnalyticsJob.put(analyticsConfig.getId(),

View File

@ -30,6 +30,7 @@ import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -39,7 +40,6 @@ import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -122,7 +122,7 @@ public class MlMemoryTrackerTests extends ESTestCase {
String jobId = "job" + i; String jobId = "job" + i;
verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any()); verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(), any());
} }
verify(configProvider, times(1)).getMultiple(eq(String.join(",", allIds)), eq(false), any()); verify(configProvider, times(1)).getConfigsForJobsWithTasksLeniently(eq(new HashSet<>(allIds)), any());
} else { } else {
verify(jobResultsProvider, never()).getEstablishedMemoryUsage(anyString(), any(), any(), any(), any()); verify(jobResultsProvider, never()).getEstablishedMemoryUsage(anyString(), any(), any(), any(), any());
} }
@ -161,10 +161,10 @@ public class MlMemoryTrackerTests extends ESTestCase {
doAnswer(invocation -> { doAnswer(invocation -> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ActionListener<List<DataFrameAnalyticsConfig>> listener = ActionListener<List<DataFrameAnalyticsConfig>> listener =
(ActionListener<List<DataFrameAnalyticsConfig>>) invocation.getArguments()[2]; (ActionListener<List<DataFrameAnalyticsConfig>>) invocation.getArguments()[1];
listener.onFailure(new IllegalArgumentException("computer says no")); listener.onFailure(new IllegalArgumentException("computer says no"));
return null; return null;
}).when(configProvider).getMultiple(anyString(), anyBoolean(), any()); }).when(configProvider).getConfigsForJobsWithTasksLeniently(any(), any());
AtomicBoolean gotErrorResponse = new AtomicBoolean(false); AtomicBoolean gotErrorResponse = new AtomicBoolean(false);
memoryTracker.refresh(persistentTasks, memoryTracker.refresh(persistentTasks,
@ -177,10 +177,10 @@ public class MlMemoryTrackerTests extends ESTestCase {
doAnswer(invocation -> { doAnswer(invocation -> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ActionListener<List<DataFrameAnalyticsConfig>> listener = ActionListener<List<DataFrameAnalyticsConfig>> listener =
(ActionListener<List<DataFrameAnalyticsConfig>>) invocation.getArguments()[2]; (ActionListener<List<DataFrameAnalyticsConfig>>) invocation.getArguments()[1];
listener.onResponse(Collections.emptyList()); listener.onResponse(Collections.emptyList());
return null; return null;
}).when(configProvider).getMultiple(anyString(), anyBoolean(), any()); }).when(configProvider).getConfigsForJobsWithTasksLeniently(any(), any());
AtomicBoolean gotSuccessResponse = new AtomicBoolean(false); AtomicBoolean gotSuccessResponse = new AtomicBoolean(false);
memoryTracker.refresh(persistentTasks, memoryTracker.refresh(persistentTasks,