[ML] Allow model_memory_limit to be reduced (elastic/x-pack-elasticsearch#3998)
Up to now a job update that reduces the model memory limit was not allowed. However, there could definitely be cases where reducing the limit is necessary and reasonable. This commit makes it possible to decrease the limit as long as it does not go below the current memory usage. We obtain the latter from the model size stats. The conditions under which updating the model_memory_limit is not allowed are now: - when the job is open - latest model_size_stats.model_bytes < new value relates elastic/x-pack-elasticsearch#2461 Original commit: elastic/x-pack-elasticsearch@5b35923590
This commit is contained in:
parent
11cd9097b6
commit
1ed31af2c6
|
@ -67,7 +67,7 @@ effect.
|
|||
[NOTE]
|
||||
--
|
||||
* You can update the `analysis_limits` only while the job is closed.
|
||||
* The `model_memory_limit` property value cannot be decreased.
|
||||
* The `model_memory_limit` property value cannot be decreased below the current usage.
|
||||
* If the `memory_status` property in the `model_size_stats` object has a value
|
||||
of `hard_limit`, this means that it was unable to process some data. You might
|
||||
want to re-run this job with an increased `model_memory_limit`.
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -18,6 +19,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -61,6 +63,9 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
|
|||
this.jobId = jobId;
|
||||
this.update = update;
|
||||
this.isInternal = isInternal;
|
||||
if (MetaData.ALL.equals(jobId)) {
|
||||
throw ExceptionsHelper.badRequestException("Cannot update more than 1 job at a time");
|
||||
}
|
||||
}
|
||||
|
||||
public Request() {
|
||||
|
|
|
@ -445,9 +445,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
if (establishedModelMemory != null && establishedModelMemory > 0) {
|
||||
return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes();
|
||||
}
|
||||
Long modelMemoryLimit = (analysisLimits != null) ? analysisLimits.getModelMemoryLimit() : null;
|
||||
return ByteSizeUnit.MB.toBytes((modelMemoryLimit != null) ? modelMemoryLimit : JobUpdate.UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT)
|
||||
+ PROCESS_MEMORY_OVERHEAD.getBytes();
|
||||
return ByteSizeUnit.MB.toBytes(analysisLimits.getModelMemoryLimit()) + PROCESS_MEMORY_OVERHEAD.getBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -11,14 +11,12 @@ import org.elasticsearch.common.ParseField;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -53,14 +51,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prior to 6.1 a default model_memory_limit was not enforced in Java.
|
||||
* The default of 4GB was used in the C++ code.
|
||||
* If model_memory_limit is not defined for a job then the
|
||||
* job was created before 6.1 and a value of 4GB is assumed.
|
||||
*/
|
||||
static final long UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT = 4096;
|
||||
|
||||
private final String jobId;
|
||||
private final List<String> groups;
|
||||
private final String description;
|
||||
|
@ -356,23 +346,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
if (analysisLimits != null) {
|
||||
AnalysisLimits validatedLimits = AnalysisLimits.validateAndSetDefaults(analysisLimits, maxModelMemoryLimit,
|
||||
AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB);
|
||||
|
||||
Long oldMemoryLimit;
|
||||
if (source.getAnalysisLimits() != null) {
|
||||
oldMemoryLimit = source.getAnalysisLimits().getModelMemoryLimit() != null ?
|
||||
source.getAnalysisLimits().getModelMemoryLimit()
|
||||
: UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT;
|
||||
} else {
|
||||
oldMemoryLimit = UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT;
|
||||
}
|
||||
|
||||
if (validatedLimits.getModelMemoryLimit() < oldMemoryLimit) {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED,
|
||||
new ByteSizeValue(oldMemoryLimit, ByteSizeUnit.MB),
|
||||
new ByteSizeValue(validatedLimits.getModelMemoryLimit(), ByteSizeUnit.MB)));
|
||||
}
|
||||
|
||||
builder.setAnalysisLimits(validatedLimits);
|
||||
}
|
||||
if (renormalizationWindowDays != null) {
|
||||
|
|
|
@ -152,7 +152,8 @@ public final class Messages {
|
|||
"If the job is configured with Per-Partition Normalization enabled a detector must have a partition field";
|
||||
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "Unknown function ''{0}''";
|
||||
public static final String JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED =
|
||||
"Invalid update value for analysis_limits: model_memory_limit cannot be decreased; existing is {0}, update had {1}";
|
||||
"Invalid update value for analysis_limits: model_memory_limit cannot be decreased below current usage; " +
|
||||
"current usage [{0}], update had [{1}]";
|
||||
public static final String JOB_CONFIG_DETECTOR_DUPLICATE_FIELD_NAME =
|
||||
"{0} and {1} cannot be the same: ''{2}''";
|
||||
public static final String JOB_CONFIG_DETECTOR_COUNT_DISALLOWED =
|
||||
|
|
|
@ -551,8 +551,8 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
// will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0.
|
||||
builder.setEstablishedModelMemory(0L);
|
||||
}
|
||||
assertEquals(ByteSizeUnit.MB.toBytes(JobUpdate.UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(),
|
||||
builder.build().estimateMemoryFootprint());
|
||||
assertEquals(ByteSizeUnit.MB.toBytes(AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB)
|
||||
+ Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
|
||||
}
|
||||
|
||||
public void testEarliestValidTimestamp_GivenEmptyDataCounts() {
|
||||
|
|
|
@ -179,27 +179,6 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
assertTrue(update.isAutodetectProcessUpdate());
|
||||
}
|
||||
|
||||
public void testUpdateAnalysisLimitWithLowerValue() {
|
||||
Job.Builder jobBuilder = new Job.Builder("foo");
|
||||
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
|
||||
d1.setOverFieldName("mlcategory");
|
||||
Detector.Builder d2 = new Detector.Builder("min", "field");
|
||||
d2.setOverFieldName("host");
|
||||
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Arrays.asList(d1.build(), d2.build()));
|
||||
ac.setCategorizationFieldName("cat_field");
|
||||
jobBuilder.setAnalysisConfig(ac);
|
||||
jobBuilder.setDataDescription(new DataDescription.Builder());
|
||||
jobBuilder.setCreateTime(new Date());
|
||||
jobBuilder.setAnalysisLimits(new AnalysisLimits(42L, null));
|
||||
|
||||
JobUpdate update = new JobUpdate.Builder("foo").setAnalysisLimits(new AnalysisLimits(41L, null)).build();
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> update.mergeWithJob(jobBuilder.build(), new ByteSizeValue(0L)));
|
||||
assertEquals("Invalid update value for analysis_limits: model_memory_limit cannot be decreased; existing is 42mb, update had 41mb",
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
public void testUpdateAnalysisLimitWithValueGreaterThanMax() {
|
||||
Job.Builder jobBuilder = new Job.Builder("foo");
|
||||
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
|
||||
|
@ -235,16 +214,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
assertThat(updated.getAnalysisLimits().getModelMemoryLimit(), equalTo(2048L));
|
||||
assertThat(updated.getAnalysisLimits().getCategorizationExamplesLimit(), equalTo(5L));
|
||||
|
||||
JobUpdate updateWithDecreasedLimit = new JobUpdate.Builder("foo").setAnalysisLimits(new AnalysisLimits(1023L, null)).build();
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> updateWithDecreasedLimit.mergeWithJob(jobBuilder.build(), new ByteSizeValue(8000L, ByteSizeUnit.MB)));
|
||||
assertEquals("Invalid update value for analysis_limits: model_memory_limit cannot be decreased; existing is 1gb, update had 1023mb",
|
||||
e.getMessage());
|
||||
|
||||
JobUpdate updateAboveMaxLimit = new JobUpdate.Builder("foo").setAnalysisLimits(new AnalysisLimits(8000L, null)).build();
|
||||
|
||||
e = expectThrows(ElasticsearchStatusException.class,
|
||||
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> updateAboveMaxLimit.mergeWithJob(jobBuilder.build(), new ByteSizeValue(5000L, ByteSizeUnit.MB)));
|
||||
assertEquals("model_memory_limit [7.8gb] must be less than the value of the xpack.ml.max_model_memory_limit setting [4.8gb]",
|
||||
e.getMessage());
|
||||
|
|
|
@ -47,10 +47,6 @@ public class TransportUpdateJobAction extends TransportMasterNodeAction<UpdateJo
|
|||
|
||||
@Override
|
||||
protected void masterOperation(UpdateJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> listener) {
|
||||
if (request.getJobId().equals(MetaData.ALL)) {
|
||||
throw new IllegalArgumentException("Job Id " + MetaData.ALL + " cannot be for update");
|
||||
}
|
||||
|
||||
jobManager.updateJob(request, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
|||
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
|
@ -44,10 +46,12 @@ import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
|
|||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -233,37 +237,70 @@ public class JobManager extends AbstractComponent {
|
|||
|
||||
public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
|
||||
Job job = getJobOrThrowIfUnknown(request.getJobId());
|
||||
validate(request.getJobUpdate(), job, isValid -> {
|
||||
if (isValid) {
|
||||
internalJobUpdate(request, actionListener);
|
||||
} else {
|
||||
actionListener.onFailure(new IllegalArgumentException("Invalid update to job [" + request.getJobId() + "]"));
|
||||
}
|
||||
}, actionListener::onFailure);
|
||||
validate(request.getJobUpdate(), job, ActionListener.wrap(
|
||||
nullValue -> internalJobUpdate(request, actionListener),
|
||||
actionListener::onFailure));
|
||||
}
|
||||
|
||||
private void validate(JobUpdate jobUpdate, Job job, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
|
||||
if (jobUpdate.getModelSnapshotId() != null) {
|
||||
jobProvider.getModelSnapshot(job.getId(), jobUpdate.getModelSnapshotId(), newModelSnapshot -> {
|
||||
if (newModelSnapshot == null) {
|
||||
String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, jobUpdate.getModelSnapshotId(),
|
||||
job.getId());
|
||||
errorHandler.accept(new ResourceNotFoundException(message));
|
||||
}
|
||||
jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
|
||||
if (oldModelSnapshot != null && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) {
|
||||
String message = "Job [" + job.getId() + "] has a more recent model snapshot [" +
|
||||
oldModelSnapshot.result.getSnapshotId() + "]";
|
||||
errorHandler.accept(new IllegalArgumentException(message));
|
||||
private void validate(JobUpdate jobUpdate, Job job, ActionListener<Void> handler) {
|
||||
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(client.threadPool().executor(
|
||||
MachineLearning.UTILITY_THREAD_POOL_NAME), true);
|
||||
validateModelSnapshotIdUpdate(job, jobUpdate.getModelSnapshotId(), chainTaskExecutor);
|
||||
validateAnalysisLimitsUpdate(job, jobUpdate.getAnalysisLimits(), chainTaskExecutor);
|
||||
chainTaskExecutor.execute(handler);
|
||||
}
|
||||
|
||||
private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, ChainTaskExecutor chainTaskExecutor) {
|
||||
if (modelSnapshotId != null) {
|
||||
chainTaskExecutor.add(listener -> {
|
||||
jobProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> {
|
||||
if (newModelSnapshot == null) {
|
||||
String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId,
|
||||
job.getId());
|
||||
listener.onFailure(new ResourceNotFoundException(message));
|
||||
return;
|
||||
}
|
||||
handler.accept(true);
|
||||
}, errorHandler);
|
||||
}, errorHandler);
|
||||
} else {
|
||||
handler.accept(true);
|
||||
jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
|
||||
if (oldModelSnapshot != null
|
||||
&& newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) {
|
||||
String message = "Job [" + job.getId() + "] has a more recent model snapshot [" +
|
||||
oldModelSnapshot.result.getSnapshotId() + "]";
|
||||
listener.onFailure(new IllegalArgumentException(message));
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}, listener::onFailure);
|
||||
}, listener::onFailure);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, ChainTaskExecutor chainTaskExecutor) {
|
||||
if (newLimits == null || newLimits.getModelMemoryLimit() == null) {
|
||||
return;
|
||||
}
|
||||
Long newModelMemoryLimit = newLimits.getModelMemoryLimit();
|
||||
chainTaskExecutor.add(listener -> {
|
||||
if (isJobOpen(clusterService.state(), job.getId())) {
|
||||
listener.onFailure(ExceptionsHelper.badRequestException("Cannot update " + Job.ANALYSIS_LIMITS.getPreferredName()
|
||||
+ " while the job is open"));
|
||||
return;
|
||||
}
|
||||
jobProvider.modelSizeStats(job.getId(), modelSizeStats -> {
|
||||
if (modelSizeStats != null) {
|
||||
ByteSizeValue modelSize = new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES);
|
||||
if (newModelMemoryLimit < modelSize.getMb()) {
|
||||
listener.onFailure(ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED,
|
||||
new ByteSizeValue(modelSize.getMb(), ByteSizeUnit.MB),
|
||||
new ByteSizeValue(newModelMemoryLimit, ByteSizeUnit.MB))));
|
||||
return;
|
||||
}
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}, listener::onFailure);
|
||||
});
|
||||
}
|
||||
|
||||
private void internalJobUpdate(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
|
||||
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
|
||||
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* A utility that allows chained (serial) execution of a number of tasks
|
||||
* in async manner.
|
||||
*/
|
||||
public class ChainTaskExecutor {
|
||||
|
||||
public interface ChainTask {
|
||||
void run(ActionListener<Void> listener);
|
||||
}
|
||||
|
||||
private final ExecutorService executorService;
|
||||
private final boolean shortCircuit;
|
||||
private final LinkedList<ChainTask> tasks = new LinkedList<>();
|
||||
|
||||
public ChainTaskExecutor(ExecutorService executorService, boolean shortCircuit) {
|
||||
this.executorService = Objects.requireNonNull(executorService);
|
||||
this.shortCircuit = shortCircuit;
|
||||
}
|
||||
|
||||
public synchronized void add(ChainTask task) {
|
||||
tasks.add(task);
|
||||
}
|
||||
|
||||
public synchronized void execute(ActionListener<Void> listener) {
|
||||
if (tasks.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
ChainTask task = tasks.pop();
|
||||
executorService.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (shortCircuit) {
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
execute(listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
task.run(ActionListener.wrap(nullValue -> execute(listener), this::onFailure));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ChainTaskExecutorTests extends ESTestCase {
|
||||
|
||||
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
terminate(threadPool);
|
||||
} finally {
|
||||
super.tearDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testExecute() throws InterruptedException {
|
||||
final List<String> strings = new ArrayList<>();
|
||||
ActionListener<Void> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
|
||||
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false);
|
||||
chainTaskExecutor.add(listener -> { strings.add("first"); listener.onResponse(null); });
|
||||
chainTaskExecutor.add(listener -> { strings.add("second"); listener.onResponse(null); });
|
||||
|
||||
chainTaskExecutor.execute(finalListener);
|
||||
|
||||
latch.await();
|
||||
|
||||
assertThat(strings, contains("first", "second", "last"));
|
||||
}
|
||||
|
||||
public void testExecute_GivenSingleFailureAndShortCircuit() throws InterruptedException {
|
||||
final List<String> strings = new ArrayList<>();
|
||||
ActionListener<Void> finalListener = createBlockingListener(() -> fail(),
|
||||
e -> assertThat(e.getMessage(), equalTo("some error")));
|
||||
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), true);
|
||||
chainTaskExecutor.add(listener -> { strings.add("before"); listener.onResponse(null); });
|
||||
chainTaskExecutor.add(listener -> { throw new RuntimeException("some error"); });
|
||||
chainTaskExecutor.add(listener -> { strings.add("after"); listener.onResponse(null); });
|
||||
|
||||
chainTaskExecutor.execute(finalListener);
|
||||
|
||||
latch.await();
|
||||
|
||||
assertThat(strings, contains("before"));
|
||||
}
|
||||
|
||||
public void testExecute_GivenMultipleFailuresAndShortCircuit() throws InterruptedException {
|
||||
final List<String> strings = new ArrayList<>();
|
||||
ActionListener<Void> finalListener = createBlockingListener(() -> fail(),
|
||||
e -> assertThat(e.getMessage(), equalTo("some error 1")));
|
||||
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), true);
|
||||
chainTaskExecutor.add(listener -> { strings.add("before"); listener.onResponse(null); });
|
||||
chainTaskExecutor.add(listener -> { throw new RuntimeException("some error 1"); });
|
||||
chainTaskExecutor.add(listener -> { throw new RuntimeException("some error 2"); });
|
||||
|
||||
chainTaskExecutor.execute(finalListener);
|
||||
|
||||
latch.await();
|
||||
|
||||
assertThat(strings, contains("before"));
|
||||
}
|
||||
|
||||
public void testExecute_GivenFailureAndNoShortCircuit() throws InterruptedException {
|
||||
final List<String> strings = new ArrayList<>();
|
||||
ActionListener<Void> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
|
||||
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false);
|
||||
chainTaskExecutor.add(listener -> { strings.add("before"); listener.onResponse(null); });
|
||||
chainTaskExecutor.add(listener -> { throw new RuntimeException("some error"); });
|
||||
chainTaskExecutor.add(listener -> { strings.add("after"); listener.onResponse(null); });
|
||||
|
||||
chainTaskExecutor.execute(finalListener);
|
||||
|
||||
latch.await();
|
||||
|
||||
assertThat(strings, contains("before", "after", "last"));
|
||||
}
|
||||
|
||||
public void testExecute_GivenNoTasksAdded() throws InterruptedException {
|
||||
final List<String> strings = new ArrayList<>();
|
||||
ActionListener<Void> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
|
||||
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false);
|
||||
|
||||
chainTaskExecutor.execute(finalListener);
|
||||
|
||||
latch.await();
|
||||
|
||||
assertThat(strings, contains("last"));
|
||||
}
|
||||
|
||||
private ActionListener<Void> createBlockingListener(Runnable runnable, Consumer<Exception> errorHandler) {
|
||||
return ActionListener.wrap(nullValue -> {
|
||||
runnable.run();
|
||||
latch.countDown();
|
||||
}, e -> {
|
||||
errorHandler.accept(e);
|
||||
latch.countDown();
|
||||
});
|
||||
}
|
||||
}
|
|
@ -277,7 +277,7 @@
|
|||
"terms": "foo,bar"
|
||||
},
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": 10
|
||||
"model_memory_limit": "10mb"
|
||||
},
|
||||
"renormalization_window_days": 1,
|
||||
"background_persist_interval": "2h",
|
||||
|
@ -309,9 +309,6 @@
|
|||
"enabled": false,
|
||||
"terms": "foobar"
|
||||
},
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": 20
|
||||
},
|
||||
"renormalization_window_days": 10,
|
||||
"background_persist_interval": "3h",
|
||||
"model_snapshot_retention_days": 30,
|
||||
|
@ -326,7 +323,6 @@
|
|||
- match: { description: "Post update description" }
|
||||
- match: { model_plot_config.enabled: false }
|
||||
- match: { model_plot_config.terms: "foobar" }
|
||||
- match: { analysis_limits.model_memory_limit: "20mb" }
|
||||
- match: { analysis_config.categorization_filters: ["cat3.*"] }
|
||||
- match: { analysis_config.detectors.0.rules.0.target_field_name: "airline" }
|
||||
- match: { analysis_config.detectors.0.detector_index: 0 }
|
||||
|
@ -338,16 +334,43 @@
|
|||
- match: { results_retention_days: 40 }
|
||||
|
||||
- do:
|
||||
catch: "/Invalid update value for analysis_limits: model_memory_limit cannot be decreased; existing is 20mb, update had 1mb/"
|
||||
catch: "/Cannot update analysis_limits while the job is open/"
|
||||
xpack.ml.update_job:
|
||||
job_id: jobs-crud-update-job
|
||||
body: >
|
||||
{
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": "1mb"
|
||||
"model_memory_limit": "20mb"
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
xpack.ml.close_job:
|
||||
job_id: jobs-crud-update-job
|
||||
- match: { closed: true }
|
||||
|
||||
- do:
|
||||
xpack.ml.update_job:
|
||||
job_id: jobs-crud-update-job
|
||||
body: >
|
||||
{
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": "20mb"
|
||||
}
|
||||
}
|
||||
- match: { analysis_limits.model_memory_limit: "20mb" }
|
||||
|
||||
- do:
|
||||
xpack.ml.update_job:
|
||||
job_id: jobs-crud-update-job
|
||||
body: >
|
||||
{
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": "15mb"
|
||||
}
|
||||
}
|
||||
- match: { analysis_limits.model_memory_limit: "15mb" }
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.ml.update_job:
|
||||
|
@ -357,6 +380,65 @@
|
|||
"description":"Can't update all description"
|
||||
}
|
||||
|
||||
---
|
||||
"Test cannot decrease model_memory_limit below current usage":
|
||||
- do:
|
||||
xpack.ml.put_job:
|
||||
job_id: jobs-crud-model-memory-limit-decrease
|
||||
body: >
|
||||
{
|
||||
"job_id":"jobs-crud-model-memory-limit-decrease",
|
||||
"analysis_config" : {
|
||||
"bucket_span": "1h",
|
||||
"detectors" :[{"function":"count"}]
|
||||
},
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": "50mb"
|
||||
},
|
||||
"data_description" : {
|
||||
"time_field":"time"
|
||||
}
|
||||
}
|
||||
- match: { job_id: "jobs-crud-model-memory-limit-decrease" }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: .ml-anomalies-shared
|
||||
type: doc
|
||||
id: jobs-crud-model-memory-limit-decrease_model_size_stats_1517443200000
|
||||
body:
|
||||
job_id: jobs-crud-model-memory-limit-decrease
|
||||
result_type: model_size_stats
|
||||
log_time: 1517443200000
|
||||
timestamp: 1517443200000
|
||||
model_bytes: 10000000
|
||||
|
||||
- do:
|
||||
indices.refresh: {}
|
||||
|
||||
- do:
|
||||
catch: /Invalid update value for analysis_limits[:] model_memory_limit cannot be decreased below current usage; current usage \[9mb\], update had \[5mb\]/
|
||||
xpack.ml.update_job:
|
||||
job_id: jobs-crud-model-memory-limit-decrease
|
||||
body: >
|
||||
{
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": "5mb"
|
||||
}
|
||||
}
|
||||
|
||||
# Decreasing over current usage works
|
||||
- do:
|
||||
xpack.ml.update_job:
|
||||
job_id: jobs-crud-model-memory-limit-decrease
|
||||
body: >
|
||||
{
|
||||
"analysis_limits": {
|
||||
"model_memory_limit": "30mb"
|
||||
}
|
||||
}
|
||||
- match: { analysis_limits.model_memory_limit: "30mb" }
|
||||
|
||||
---
|
||||
"Test delete job that is referred by a datafeed":
|
||||
- do:
|
||||
|
|
|
@ -48,6 +48,7 @@ integTestRunner {
|
|||
'ml/jobs_crud/Test cannot create job with existing quantiles document',
|
||||
'ml/jobs_crud/Test cannot create job with existing result document',
|
||||
'ml/jobs_crud/Test cannot create job with model snapshot id set',
|
||||
'ml/jobs_crud/Test cannot decrease model_memory_limit below current usage',
|
||||
'ml/jobs_crud/Test get job API with non existing job id',
|
||||
'ml/jobs_crud/Test put job after closing results index',
|
||||
'ml/jobs_crud/Test put job after closing state index',
|
||||
|
|
Loading…
Reference in New Issue