From f06acdc219d56557cdbaba777b21a41c423cf848 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 21 Nov 2017 09:51:52 +0000 Subject: [PATCH 1/2] [ML] Improve the way ML jobs are allocated to nodes (elastic/x-pack-elasticsearch#2975) This change modifies the way ML jobs are assigned to nodes to primarily base the decision on the estimated memory footprint of the jobs. The memory footprint comes from the model size stats if the job has been running long enough, otherwise from the model memory limit. In addition, an allowance for the program code and stack is added. If insufficient information is available to base the allocation decision on memory requirements then the decision falls back to using simple job counts per node. relates elastic/x-pack-elasticsearch#546 Original commit: elastic/x-pack-elasticsearch@b276aedf2fca6aee7382ba8f9deef6034a5d0ec3 --- docs/en/settings/ml-settings.asciidoc | 7 ++ .../xpack/ml/MachineLearning.java | 36 +++++- .../xpack/ml/action/OpenJobAction.java | 112 ++++++++++++++---- .../xpack/ml/job/JobManager.java | 50 ++++++-- .../ml/job/UpdateJobProcessNotifier.java | 43 ++++--- .../xpack/ml/job/config/Job.java | 88 ++++++++++++-- .../xpack/ml/job/config/JobUpdate.java | 47 +++++++- .../xpack/ml/job/persistence/JobProvider.java | 41 ++++--- .../job/persistence/JobResultsPersister.java | 7 +- .../autodetect/AutodetectProcessManager.java | 3 +- .../output/AutoDetectResultProcessor.java | 68 +++++++++-- .../xpack/ml/MachineLearningTests.java | 40 +++++++ .../xpack/ml/action/OpenJobActionTests.java | 75 +++++++++--- .../AutodetectResultProcessorIT.java | 2 +- .../integration/BasicDistributedJobsIT.java | 24 ++-- .../integration/BasicRenormalizationIT.java | 9 ++ .../xpack/ml/integration/DatafeedJobsIT.java | 9 ++ .../ml/integration/EstablishedMemUsageIT.java | 85 +++++++++---- .../ml/integration/NetworkDisruptionIT.java | 4 +- .../ml/integration/OverallBucketsIT.java | 9 ++ .../integration/RestoreModelSnapshotIT.java | 13 +- .../xpack/ml/integration/TooManyJobsIT.java | 40 +++++-- .../xpack/ml/job/config/JobTests.java | 39 +++++- .../xpack/ml/job/config/JobUpdateTests.java | 3 + .../AutoDetectResultProcessorTests.java | 53 ++++++++- .../xpack/ml/support/BaseMlIntegTestCase.java | 17 ++- 26 files changed, 747 insertions(+), 177 deletions(-) diff --git a/docs/en/settings/ml-settings.asciidoc b/docs/en/settings/ml-settings.asciidoc index bedc09c6332..c0d4378a771 100644 --- a/docs/en/settings/ml-settings.asciidoc +++ b/docs/en/settings/ml-settings.asciidoc @@ -40,6 +40,13 @@ default behavior. `xpack.ml.max_open_jobs`:: The maximum number of jobs that can run on a node. Defaults to `10`. +`xpack.ml.max_machine_memory_percent`:: +The maximum percentage of the machine's memory that {ml} may use for running +analytics processes. (These processes are separate to the {es} JVM.) Defaults to +`30` percent. The limit is based on the total memory of the machine, not current +free memory. Jobs will not be allocated to a node if doing so would cause the +estimated memory use of {ml} jobs to exceed the limit. + `xpack.ml.max_model_memory_limit`:: The maximum `model_memory_limit` property value that can be set for any job on this node. If you try to create a job with a `model_memory_limit` property value diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3d0ff7d719a..8327d5e4607 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -23,11 +23,14 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.monitor.os.OsProbe; +import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -138,6 +141,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -161,10 +165,13 @@ public class MachineLearning implements ActionPlugin { Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Property.NodeScope); public static final String ML_ENABLED_NODE_ATTR = "ml.enabled"; public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs"; + public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); public static final Setting MAX_MODEL_MEMORY_LIMIT = Setting.memorySizeSetting("xpack.ml.max_model_memory_limit", new ByteSizeValue(0), Property.Dynamic, Property.NodeScope); + public static final Setting MAX_MACHINE_MEMORY_PERCENT = + Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope); public static final TimeValue STATE_PERSIST_RESTORE_TIMEOUT = TimeValue.timeValueMinutes(30); @@ -195,6 +202,7 @@ public class MachineLearning implements ActionPlugin { ML_ENABLED, CONCURRENT_JOB_ALLOCATIONS, MAX_MODEL_MEMORY_LIMIT, + MAX_MACHINE_MEMORY_PERCENT, ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING, ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, @@ -206,9 +214,10 @@ public class MachineLearning implements ActionPlugin { public Settings additionalSettings() { String mlEnabledNodeAttrName = "node.attr." + ML_ENABLED_NODE_ATTR; String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR; + String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR; if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { - disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName); + disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName); return Settings.EMPTY; } @@ -219,8 +228,10 @@ public class MachineLearning implements ActionPlugin { addMlNodeAttribute(additionalSettings, mlEnabledNodeAttrName, "true"); addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName, String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings))); + addMlNodeAttribute(additionalSettings, machineMemoryAttrName, + Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats()))); } else { - disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName); + disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName); } return additionalSettings.build(); } @@ -504,4 +515,25 @@ public class MachineLearning implements ActionPlugin { maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool"); return Arrays.asList(autoDetect, renormalizer, datafeed); } + + /** + * Find the memory size (in bytes) of the machine this node is running on. + * Takes container limits (as used by Docker for example) into account. + */ + static long machineMemoryFromStats(OsStats stats) { + long mem = stats.getMem().getTotal().getBytes(); + OsStats.Cgroup cgroup = stats.getCgroup(); + if (cgroup != null) { + String containerLimitStr = cgroup.getMemoryLimitInBytes(); + if (containerLimitStr != null) { + BigInteger containerLimit = new BigInteger(containerLimitStr); + if ((containerLimit.compareTo(BigInteger.valueOf(mem)) < 0 && containerLimit.compareTo(BigInteger.ZERO) > 0) + // mem < 0 means the value couldn't be obtained for some reason + || (mem < 0 && containerLimit.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) < 0)) { + mem = containerLimit.longValue(); + } + } + } + return mem; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 8ca53d8fe9e..3be9346254c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -77,6 +77,7 @@ import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -573,6 +574,7 @@ public class OpenJobAction extends Action unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + @@ -664,9 +675,14 @@ public class OpenJobAction extends Action reasons = new LinkedList<>(); - DiscoveryNode minLoadedNode = null; + long maxAvailableCount = Long.MIN_VALUE; + long maxAvailableMemory = Long.MIN_VALUE; + DiscoveryNode minLoadedNodeByCount = null; + DiscoveryNode minLoadedNodeByMemory = null; + // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe + // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs + boolean allocateByMemory = true; PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); @@ -697,22 +713,26 @@ public class OpenJobAction extends Action { - if (node.getId().equals(task.getExecutorNode()) == false) { - return false; + // find all the job tasks assigned to this node + Collection> assignedTasks = persistentTasks.findTasks(OpenJobAction.TASK_NAME, + task -> node.getId().equals(task.getExecutorNode())); + numberOfAssignedJobs = assignedTasks.size(); + for (PersistentTask assignedTask : assignedTasks) { + JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus(); + if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING + // previous executor node failed and current executor node didn't have the chance to set job status to OPENING + jobTaskState.isStatusStale(assignedTask)) { + ++numberOfAllocatingJobs; } - JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); - return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING - jobTaskState.isStatusStale(task); // previous executor node failed and - // current executor node didn't have the chance to set job status to OPENING - }).size(); - } else { - numberOfAssignedJobs = 0; - numberOfAllocatingJobs = 0; + String assignedJobId = ((JobParams) assignedTask.getParams()).getJobId(); + Job assignedJob = mlMetadata.getJobs().get(assignedJobId); + assert assignedJob != null; + assignedJobMemory += assignedJob.estimateMemoryFootprint(); + } } if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) { String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs + @@ -736,8 +756,8 @@ public class OpenJobAction extends Action 0) { + long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + long estimatedMemoryFootprint = job.estimateMemoryFootprint(); + long availableMemory = maxMlMemory - assignedJobMemory; + if (estimatedMemoryFootprint > availableMemory) { + String reason = "Not opening job [" + jobId + "] on node [" + node + + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory + + "], memory required by existing jobs [" + assignedJobMemory + + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + + if (maxAvailableMemory < availableMemory) { + maxAvailableMemory = availableMemory; + minLoadedNodeByMemory = node; + } + } else { + // If we cannot get the available memory on any machine in + // the cluster, fall back to simply allocating by job count + allocateByMemory = false; + logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", + jobId, node); + } } } + DiscoveryNode minLoadedNode = allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount; if (minLoadedNode != null) { logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); return new Assignment(minLoadedNode.getId(), ""); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 675e5de4a9a..6d844d17b14 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -19,6 +19,9 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.DeleteJobAction; @@ -38,6 +41,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -245,6 +249,7 @@ public class JobManager extends AbstractComponent { clusterService.submitStateUpdateTask("update-job-" + jobId, new AckedClusterStateUpdateTask(request, actionListener) { private volatile Job updatedJob; + private volatile boolean changeWasRequired; @Override protected PutJobAction.Response newResponse(boolean acknowledged) { @@ -255,16 +260,33 @@ public class JobManager extends AbstractComponent { public ClusterState execute(ClusterState currentState) throws Exception { Job job = getJobOrThrowIfUnknown(jobId, currentState); updatedJob = jobUpdate.mergeWithJob(job, maxModelMemoryLimit); + if (updatedJob.equals(job)) { + // nothing to do + return currentState; + } + changeWasRequired = true; return updateClusterState(updatedJob, true, currentState); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - PersistentTasksCustomMetaData persistentTasks = - newState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - JobState jobState = MlMetadata.getJobState(jobId, persistentTasks); - if (jobState == JobState.OPENED) { - updateJobProcessNotifier.submitJobUpdate(jobUpdate); + if (changeWasRequired) { + PersistentTasksCustomMetaData persistentTasks = + newState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + JobState jobState = MlMetadata.getJobState(jobId, persistentTasks); + if (jobState == JobState.OPENED) { + updateJobProcessNotifier.submitJobUpdate(jobUpdate); + } + } else { + logger.debug("[{}] Ignored job update with no changes: {}", () -> jobId, () -> { + try { + XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); + jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + return jsonBuilder.string(); + } catch (IOException e) { + return "(unprintable due to " + e.getMessage() + ")"; + } + }); } } }); @@ -330,9 +352,10 @@ public class JobManager extends AbstractComponent { public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener actionListener, ModelSnapshot modelSnapshot) { + final ModelSizeStats modelSizeStats = modelSnapshot.getModelSizeStats(); final JobResultsPersister persister = new JobResultsPersister(settings, client); - // Step 2. After the model size stats is persisted, also persist the snapshot's quantiles and respond + // Step 3. After the model size stats is persisted, also persist the snapshot's quantiles and respond // ------- CheckedConsumer modelSizeStatsResponseHandler = response -> { persister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE, @@ -344,21 +367,20 @@ public class JobManager extends AbstractComponent { }, actionListener::onFailure)); }; - // Step 1. When the model_snapshot_id is updated on the job, persist the snapshot's model size stats with a touched log time + // Step 2. When the model_snapshot_id is updated on the job, persist the snapshot's model size stats with a touched log time // so that a search for the latest model size stats returns the reverted one. // ------- CheckedConsumer updateHandler = response -> { if (response) { - ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSnapshot.getModelSizeStats()) - .setLogTime(new Date()).build(); + ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSizeStats).setLogTime(new Date()).build(); persister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap( modelSizeStatsResponseHandler::accept, actionListener::onFailure)); } }; - // Step 0. Kick off the chain of callbacks with the cluster state update + // Step 1. Do the cluster state update // ------- - clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), + Consumer clusterStateHandler = response -> clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { @Override @@ -377,9 +399,15 @@ public class JobManager extends AbstractComponent { Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState); Job.Builder builder = new Job.Builder(job); builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); + builder.setEstablishedModelMemory(response); return updateClusterState(builder.build(), true, currentState); } }); + + // Step 0. Find the appropriate established model memory for the reverted job + // ------- + jobProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, clusterStateHandler, + actionListener::onFailure); } private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index aa570bb0267..e44c92f4ecf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -24,18 +24,15 @@ import java.util.concurrent.LinkedBlockingQueue; import static org.elasticsearch.xpack.ml.action.UpdateProcessAction.Request; import static org.elasticsearch.xpack.ml.action.UpdateProcessAction.Response; -public class UpdateJobProcessNotifier extends AbstractComponent - implements LocalNodeMasterListener { +public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener { private final Client client; private final ThreadPool threadPool; - private final LinkedBlockingQueue orderedJobUpdates = - new LinkedBlockingQueue<>(1000); + private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); private volatile ThreadPool.Cancellable cancellable; - public UpdateJobProcessNotifier(Settings settings, Client client, - ClusterService clusterService, ThreadPool threadPool) { + public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) { super(settings); this.client = client; this.threadPool = threadPool; @@ -62,12 +59,11 @@ public class UpdateJobProcessNotifier extends AbstractComponent stop(); } - void start() { - cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, - TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC); + private void start() { + cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC); } - void stop() { + private void stop() { orderedJobUpdates.clear(); ThreadPool.Cancellable cancellable = this.cancellable; @@ -82,20 +78,26 @@ public class UpdateJobProcessNotifier extends AbstractComponent return ThreadPool.Names.SAME; } - void processNextUpdate() { + private void processNextUpdate() { try { JobUpdate jobUpdate = orderedJobUpdates.poll(); if (jobUpdate != null) { - executeRemoteJob(jobUpdate); + executeRemoteJobIfNecessary(jobUpdate); } } catch (Exception e) { logger.error("Unable while processing next job update", e); } } + void executeRemoteJobIfNecessary(JobUpdate update) { + // Do nothing if the fields that the C++ needs aren't being updated + if (update.isAutodetectProcessUpdate()) { + executeRemoteJob(update); + } + } + void executeRemoteJob(JobUpdate update) { - Request request = new Request(update.getJobId(), update.getModelPlotConfig(), - update.getDetectorUpdates()); + Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates()); client.execute(UpdateProcessAction.INSTANCE, request, new ActionListener() { @Override @@ -110,15 +112,12 @@ public class UpdateJobProcessNotifier extends AbstractComponent @Override public void onFailure(Exception e) { if (e instanceof ResourceNotFoundException) { - logger.debug("Remote job [{}] not updated as it has been deleted", - update.getJobId()); - } else if (e.getMessage().contains("because job [" + update.getJobId() + - "] is not open") && e instanceof ElasticsearchStatusException) { - logger.debug("Remote job [{}] not updated as it is no longer open", - update.getJobId()); + logger.debug("Remote job [{}] not updated as it has been deleted", update.getJobId()); + } else if (e.getMessage().contains("because job [" + update.getJobId() + "] is not open") + && e instanceof ElasticsearchStatusException) { + logger.debug("Remote job [{}] not updated as it is no longer open", update.getJobId()); } else { - logger.error("Failed to update remote job [" + update.getJobId() + "]", - e); + logger.error("Failed to update remote job [" + update.getJobId() + "]", e); } } }); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 656ef650e07..496398275bf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -10,7 +10,6 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.inject.spi.Message; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -69,6 +68,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); public static final ParseField LAST_DATA_TIME = new ParseField("last_data_time"); + public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory"); public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config"); public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days"); public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval"); @@ -88,6 +88,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final int MAX_JOB_ID_LENGTH = 64; public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1); + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB); static { PARSERS.put(MlParserType.METADATA, METADATA_PARSER); @@ -127,6 +128,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]"); }, LAST_DATA_TIME, ValueType.VALUE); + parser.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY); parser.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSERS.get(parserType), ANALYSIS_CONFIG); parser.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSERS.get(parserType), ANALYSIS_LIMITS); parser.declareObject(Builder::setDataDescription, DataDescription.PARSERS.get(parserType), DATA_DESCRIPTION); @@ -159,6 +161,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private final Date createTime; private final Date finishedTime; private final Date lastDataTime; + private final Long establishedModelMemory; private final AnalysisConfig analysisConfig; private final AnalysisLimits analysisLimits; private final DataDescription dataDescription; @@ -173,7 +176,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private final boolean deleted; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, Date createTime, - Date finishedTime, Date lastDataTime, + Date finishedTime, Date lastDataTime, Long establishedModelMemory, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, @@ -187,6 +190,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.createTime = createTime; this.finishedTime = finishedTime; this.lastDataTime = lastDataTime; + this.establishedModelMemory = establishedModelMemory; this.analysisConfig = analysisConfig; this.analysisLimits = analysisLimits; this.dataDescription = dataDescription; @@ -218,6 +222,12 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO createTime = new Date(in.readVLong()); finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null; lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null; + // TODO: set to V_6_1_0 after backporting + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + establishedModelMemory = in.readOptionalLong(); + } else { + establishedModelMemory = null; + } analysisConfig = new AnalysisConfig(in); analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); dataDescription = in.readOptionalWriteable(DataDescription::new); @@ -319,6 +329,16 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return lastDataTime; } + /** + * The established model memory of the job, or null if model + * memory has not reached equilibrium yet. + * + * @return The established model memory of the job + */ + public Long getEstablishedModelMemory() { + return establishedModelMemory; + } + /** * The analysis configuration object * @@ -418,6 +438,23 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return new ArrayList<>(allFields); } + /** + * Make a best estimate of the job's memory footprint using the information available. + * If a job has an established model memory size, then this is the best estimate. + * Otherwise, assume the maximum model memory limit will eventually be required. + * In either case, a fixed overhead is added to account for the memory required by the + * program code and stack. + * @return an estimate of the memory requirement of this job, in bytes + */ + public long estimateMemoryFootprint() { + if (establishedModelMemory != null && establishedModelMemory > 0) { + return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes(); + } + Long modelMemoryLimit = (analysisLimits != null) ? analysisLimits.getModelMemoryLimit() : null; + return ByteSizeUnit.MB.toBytes((modelMemoryLimit != null) ? modelMemoryLimit : JobUpdate.UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT) + + PROCESS_MEMORY_OVERHEAD.getBytes(); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); @@ -447,6 +484,10 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } else { out.writeBoolean(false); } + // TODO: set to V_6_1_0 after backporting + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(establishedModelMemory); + } analysisConfig.writeTo(out); out.writeOptionalWriteable(analysisLimits); out.writeOptionalWriteable(dataDescription); @@ -492,6 +533,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + humanReadableSuffix, lastDataTime.getTime()); } + if (establishedModelMemory != null) { + builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); + } builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); if (analysisLimits != null) { builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params); @@ -546,6 +590,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) && Objects.equals(this.lastDataTime, that.lastDataTime) + && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.analysisConfig, that.analysisConfig) && Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription) && Objects.equals(this.modelPlotConfig, that.modelPlotConfig) @@ -561,8 +606,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO @Override public int hashCode() { - return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, analysisConfig, - analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, + return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory, + analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted); } @@ -605,6 +650,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private Date createTime; private Date finishedTime; private Date lastDataTime; + private Long establishedModelMemory; private ModelPlotConfig modelPlotConfig; private Long renormalizationWindowDays; private TimeValue backgroundPersistInterval; @@ -634,6 +680,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.createTime = job.getCreateTime(); this.finishedTime = job.getFinishedTime(); this.lastDataTime = job.getLastDataTime(); + this.establishedModelMemory = job.getEstablishedModelMemory(); this.modelPlotConfig = job.getModelPlotConfig(); this.renormalizationWindowDays = job.getRenormalizationWindowDays(); this.backgroundPersistInterval = job.getBackgroundPersistInterval(); @@ -660,6 +707,10 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO createTime = in.readBoolean() ? new Date(in.readVLong()) : null; finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null; lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null; + // TODO: set to V_6_1_0 after backporting + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + establishedModelMemory = in.readOptionalLong(); + } analysisConfig = in.readOptionalWriteable(AnalysisConfig::new); analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); dataDescription = in.readOptionalWriteable(DataDescription::new); @@ -699,10 +750,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.groups = groups == null ? Collections.emptyList() : groups; } - public Date getCreateTime() { - return createTime; - } - public Builder setCustomSettings(Map customSettings) { this.customSettings = customSettings; return this; @@ -742,6 +789,11 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return this; } + public Builder setEstablishedModelMemory(Long establishedModelMemory) { + this.establishedModelMemory = establishedModelMemory; + return this; + } + public Builder setDataDescription(DataDescription.Builder description) { dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); return this; @@ -844,6 +896,10 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } else { out.writeBoolean(false); } + // TODO: set to V_6_1_0 after backporting + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(establishedModelMemory); + } out.writeOptionalWriteable(analysisConfig); out.writeOptionalWriteable(analysisLimits); out.writeOptionalWriteable(dataDescription); @@ -880,6 +936,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO if (lastDataTime != null) { builder.field(LAST_DATA_TIME.getPreferredName(), lastDataTime.getTime()); } + if (establishedModelMemory != null) { + builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); + } if (analysisConfig != null) { builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params); } @@ -937,6 +996,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO && Objects.equals(this.createTime, that.createTime) && Objects.equals(this.finishedTime, that.finishedTime) && Objects.equals(this.lastDataTime, that.lastDataTime) + && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.modelPlotConfig, that.modelPlotConfig) && Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays) && Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval) @@ -951,8 +1011,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO @Override public int hashCode() { return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime, - finishedTime, lastDataTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, - modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted); + finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays, + backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, + resultsIndexName, deleted); } /** @@ -1048,6 +1109,11 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public Job build(Date createTime) { setCreateTime(createTime); setJobVersion(Version.CURRENT); + // TODO: Maybe we _could_ accept a value for this supplied at create time - it would + // mean cloned jobs that hadn't been edited much would start with an accurate expected size. + // But on the other hand it would mean jobs that were cloned and then completely changed + // would start with a size that was completely wrong. + setEstablishedModelMemory(null); return build(); } @@ -1076,7 +1142,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } return new Job( - id, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, + id, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java index 66ff4a9a832..7c41af6bf4f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java @@ -48,6 +48,7 @@ public class JobUpdate implements Writeable, ToXContentObject { PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); + PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); } /** @@ -56,7 +57,7 @@ public class JobUpdate implements Writeable, ToXContentObject { * If model_memory_limit is not defined for a job then the * job was created before 6.1 and a value of 4GB is assumed. */ - private static final long UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT = 4096; + static final long UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT = 4096; private final String jobId; private final List groups; @@ -71,13 +72,15 @@ public class JobUpdate implements Writeable, ToXContentObject { private final List categorizationFilters; private final Map customSettings; private final String modelSnapshotId; + private final Long establishedModelMemory; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval, @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, - @Nullable Map customSettings, @Nullable String modelSnapshotId) { + @Nullable Map customSettings, @Nullable String modelSnapshotId, + @Nullable Long establishedModelMemory) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -91,6 +94,7 @@ public class JobUpdate implements Writeable, ToXContentObject { this.categorizationFilters = categorisationFilters; this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; + this.establishedModelMemory = establishedModelMemory; } public JobUpdate(StreamInput in) throws IOException { @@ -120,6 +124,12 @@ public class JobUpdate implements Writeable, ToXContentObject { } customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); + // TODO: set to V_6_1_0 after backporting + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + establishedModelMemory = in.readOptionalLong(); + } else { + establishedModelMemory = null; + } } @Override @@ -146,6 +156,10 @@ public class JobUpdate implements Writeable, ToXContentObject { } out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); + // TODO: set to V_6_1_0 after backporting + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeOptionalLong(establishedModelMemory); + } } public String getJobId() { @@ -200,6 +214,10 @@ public class JobUpdate implements Writeable, ToXContentObject { return modelSnapshotId; } + public Long getEstablishedModelMemory() { + return establishedModelMemory; + } + public boolean isAutodetectProcessUpdate() { return modelPlotConfig != null || detectorUpdates != null; } @@ -244,6 +262,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (modelSnapshotId != null) { builder.field(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId); } + if (establishedModelMemory != null) { + builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); + } builder.endObject(); return builder; } @@ -344,7 +365,14 @@ public class JobUpdate implements Writeable, ToXContentObject { if (modelSnapshotId != null) { builder.setModelSnapshotId(modelSnapshotId); } - + if (establishedModelMemory != null) { + // An established model memory of zero means we don't actually know the established model memory + if (establishedModelMemory > 0) { + builder.setEstablishedModelMemory(establishedModelMemory); + } else { + builder.setEstablishedModelMemory(null); + } + } return builder.build(); } @@ -372,14 +400,15 @@ public class JobUpdate implements Writeable, ToXContentObject { && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.categorizationFilters, that.categorizationFilters) && Objects.equals(this.customSettings, that.customSettings) - && Objects.equals(this.modelSnapshotId, that.modelSnapshotId); + && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) + && Objects.equals(this.establishedModelMemory, that.establishedModelMemory); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId); + modelSnapshotId, establishedModelMemory); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -490,6 +519,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private List categorizationFilters; private Map customSettings; private String modelSnapshotId; + private Long establishedModelMemory; public Builder(String jobId) { this.jobId = jobId; @@ -560,10 +590,15 @@ public class JobUpdate implements Writeable, ToXContentObject { return this; } + public Builder setEstablishedModelMemory(Long establishedModelMemory) { + this.establishedModelMemory = establishedModelMemory; + return this; + } + public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId); + modelSnapshotId, establishedModelMemory); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 6d11d0c079e..ab008e69252 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -80,6 +80,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -92,7 +93,7 @@ public class JobProvider { private static final Logger LOGGER = Loggers.getLogger(JobProvider.class); private static final int RECORDS_SIZE_PARAM = 10000; - private static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20; + public static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20; private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1; private final Client client; @@ -866,11 +867,16 @@ public class JobProvider { * BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE buckets, which is defined as having a coefficient of variation * of no more than ESTABLISHED_MEMORY_CV_THRESHOLD * @param jobId the id of the job for which established memory usage is required + * @param latestBucketTimestamp the latest bucket timestamp to be used for the calculation, if known, otherwise + * null, implying the latest bucket that exists in the results index + * @param latestModelSizeStats the latest model size stats for the job, if known, otherwise null - supplying + * these when available avoids one search * @param handler if the method succeeds, this will be passed the established memory usage (in bytes) of the - * specified job, or null if memory usage is not yet established + * specified job, or 0 if memory usage is not yet established * @param errorHandler if a problem occurs, the exception will be passed to this handler */ - public void getEstablishedMemoryUsage(String jobId, Consumer handler, Consumer errorHandler) { + public void getEstablishedMemoryUsage(String jobId, Date latestBucketTimestamp, ModelSizeStats latestModelSizeStats, + Consumer handler, Consumer errorHandler) { String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); @@ -894,7 +900,7 @@ public class JobProvider { long count = extendedStats.getCount(); if (count <= 0) { // model size stats haven't changed in the last N buckets, so the latest (older) ones are established - modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats), errorHandler); + handleLatestModelSizeStats(jobId, latestModelSizeStats, handler, errorHandler); } else if (count == 1) { // no need to do an extra search in the case of exactly one document being aggregated handler.accept((long) extendedStats.getAvg()); @@ -905,45 +911,46 @@ public class JobProvider { // is there sufficient stability in the latest model size stats readings? if (coefficientOfVaration <= ESTABLISHED_MEMORY_CV_THRESHOLD) { // yes, so return the latest model size as established - modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats), - errorHandler); + handleLatestModelSizeStats(jobId, latestModelSizeStats, handler, errorHandler); } else { // no - we don't have an established model size - handler.accept(null); + handler.accept(0L); } } } else { - handler.accept(null); + handler.accept(0L); } }, errorHandler )); } else { - handler.accept(null); + LOGGER.trace("[{}] Insufficient history to calculate established memory use", jobId); + handler.accept(0L); } }; // Step 1. Find the time span of the most recent N bucket results, where N is the number of buckets // required to consider memory usage "established" BucketsQueryBuilder bucketQuery = new BucketsQueryBuilder() + .end(latestBucketTimestamp != null ? Long.toString(latestBucketTimestamp.getTime() + 1) : null) .sortField(Result.TIMESTAMP.getPreferredName()) .sortDescending(true).from(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE - 1).size(1) .includeInterim(false); bucketsViaInternalClient(jobId, bucketQuery, bucketHandler, e -> { if (e instanceof ResourceNotFoundException) { - handler.accept(null); + handler.accept(0L); } else { errorHandler.accept(e); } }); } - /** - * A model size of 0 implies a completely uninitialised model. This method converts 0 to null - * before calling a handler. - */ - private static void handleModelBytesOrNull(Consumer handler, ModelSizeStats modelSizeStats) { - long modelBytes = modelSizeStats.getModelBytes(); - handler.accept(modelBytes > 0 ? modelBytes : null); + private void handleLatestModelSizeStats(String jobId, ModelSizeStats latestModelSizeStats, Consumer handler, + Consumer errorHandler) { + if (latestModelSizeStats != null) { + handler.accept(latestModelSizeStats.getModelBytes()); + } else { + modelSizeStats(jobId, modelSizeStats -> handler.accept(modelSizeStats.getModelBytes()), errorHandler); + } } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index cf100f08dae..e54bf36588f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -234,8 +234,9 @@ public class JobResultsPersister extends AbstractComponent { /** * Persist a model snapshot description */ - public void persistModelSnapshot(ModelSnapshot modelSnapshot) { + public void persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); + persistable.setRefreshPolicy(refreshPolicy); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); } @@ -247,8 +248,6 @@ public class JobResultsPersister extends AbstractComponent { logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet(); - // Don't commit as we expect masses of these updates and they're only - // for information at the API level } /** @@ -261,8 +260,6 @@ public class JobResultsPersister extends AbstractComponent { Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); persistable.setRefreshPolicy(refreshPolicy); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener); - // Don't commit as we expect masses of these updates and they're only - // for information at the API level } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 0a1585eed68..985525f3e72 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -381,7 +381,8 @@ public class AutodetectProcessManager extends AbstractComponent { autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); + client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), + autodetectParams.modelSnapshot() != null); ExecutorService autodetectWorkerExecutor; try { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 9a224647562..52c85eecd20 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; @@ -15,6 +16,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; @@ -31,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; import java.time.Duration; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -64,31 +67,39 @@ public class AutoDetectResultProcessor { private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; + private final JobProvider jobProvider; + private final boolean restoredSnapshot; final CountDownLatch completionLatch = new CountDownLatch(1); final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1); private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; + private int bucketCount; // only used from the process() thread, so doesn't need to be volatile /** * New model size stats are read as the process is running */ private volatile ModelSizeStats latestModelSizeStats; + private volatile long latestEstablishedModelMemory; + private volatile boolean haveNewLatestModelSizeStats; public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, - ModelSizeStats latestModelSizeStats) { - this(client, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener()); + JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { + this(client, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener()); } AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, - ModelSizeStats latestModelSizeStats, FlushListener flushListener) { + JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot, + FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.jobId = Objects.requireNonNull(jobId); this.renormalizer = Objects.requireNonNull(renormalizer); this.persister = Objects.requireNonNull(persister); + this.jobProvider = Objects.requireNonNull(jobProvider); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); + this.restoredSnapshot = restoredSnapshot; } public void process(AutodetectProcess process) { @@ -98,14 +109,13 @@ public class AutoDetectResultProcessor { // to kill the results reader thread as autodetect will be blocked // trying to write its output. try { - int bucketCount = 0; + bucketCount = 0; Iterator iterator = process.readAutodetectResults(); while (iterator.hasNext()) { try { AutodetectResult result = iterator.next(); processResult(context, result); if (result.getBucket() != null) { - bucketCount++; LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); } } catch (Exception e) { @@ -174,6 +184,17 @@ public class AutoDetectResultProcessor { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); + ++bucketCount; + + // if we haven't previously set established model memory, consider trying again after + // a reasonable amount of time has elapsed since the last model size stats update + long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; + if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 + && bucket.getTimestamp().getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { + persister.commitResultWrites(context.jobId); + updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), latestModelSizeStats); + haveNewLatestModelSizeStats = false; + } } List records = result.getRecords(); if (records != null && !records.isEmpty()) { @@ -218,14 +239,21 @@ public class AutoDetectResultProcessor { modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus()); latestModelSizeStats = modelSizeStats; + haveNewLatestModelSizeStats = true; persister.persistModelSizeStats(modelSizeStats); + // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets + // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and + // we'll NEVER consider memory usage to be established during this period + if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { + // We need to make all results written up to and including these stats available for the established memory calculation + persister.commitResultWrites(context.jobId); + updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats); + } } ModelSnapshot modelSnapshot = result.getModelSnapshot(); if (modelSnapshot != null) { - persister.persistModelSnapshot(modelSnapshot); - // We need to refresh the index in order for the snapshot to be available when we'll try to - // update the job with it - persister.commitResultWrites(jobId); + // We need to refresh in order for the snapshot to be available when we try to update the job with it + persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); updateModelSnapshotIdOnJob(modelSnapshot); } Quantiles quantiles = result.getQuantiles(); @@ -286,6 +314,28 @@ public class AutoDetectResultProcessor { }); } + protected void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) { + jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> { + JobUpdate update = new JobUpdate.Builder(jobId) + .setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update); + + client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + latestEstablishedModelMemory = establishedModelMemory; + LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]", + e); + } + }); + }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); + } + public void awaitCompletion() throws TimeoutException { try { // Although the results won't take 30 minutes to finish, the pipe won't be closed diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java index 96d3787d1e4..0126114f1dc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java @@ -8,11 +8,15 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; + import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MachineLearningTests extends ESTestCase { @@ -67,6 +71,42 @@ public class MachineLearningTests extends ESTestCase { "it is reserved for machine learning. If your intention was to customize machine learning, set the [xpack.ml.")); } + public void testMachineMemory_givenStatsFailure() throws IOException { + OsStats stats = mock(OsStats.class); + when(stats.getMem()).thenReturn(new OsStats.Mem(-1, -1)); + assertEquals(-1L, MachineLearning.machineMemoryFromStats(stats)); + } + + public void testMachineMemory_givenNoCgroup() throws IOException { + OsStats stats = mock(OsStats.class); + when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L)); + assertEquals(10_737_418_240L, MachineLearning.machineMemoryFromStats(stats)); + } + + public void testMachineMemory_givenCgroupNullLimit() throws IOException { + OsStats stats = mock(OsStats.class); + when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L)); + when(stats.getCgroup()).thenReturn(new OsStats.Cgroup("a", 1, "b", 2, 3, + new OsStats.Cgroup.CpuStat(4, 5, 6), null, null, null)); + assertEquals(10_737_418_240L, MachineLearning.machineMemoryFromStats(stats)); + } + + public void testMachineMemory_givenCgroupNoLimit() throws IOException { + OsStats stats = mock(OsStats.class); + when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L)); + when(stats.getCgroup()).thenReturn(new OsStats.Cgroup("a", 1, "b", 2, 3, + new OsStats.Cgroup.CpuStat(4, 5, 6), "c", "18446744073709551615", "4796416")); + assertEquals(10_737_418_240L, MachineLearning.machineMemoryFromStats(stats)); + } + + public void testMachineMemory_givenCgroupLowLimit() throws IOException { + OsStats stats = mock(OsStats.class); + when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L)); + when(stats.getCgroup()).thenReturn(new OsStats.Cgroup("a", 1, "b", 2, 3, + new OsStats.Cgroup.CpuStat(4, 5, 6), "c", "7516192768", "4796416")); + assertEquals(7_516_192_768L, MachineLearning.machineMemoryFromStats(stats)); + } + private MachineLearning createMachineLearning(Settings settings) { return new MachineLearning(settings, mock(Environment.class), mock(XPackLicenseState.class)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index dfebb506517..997cb73cf34 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -23,6 +23,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -91,9 +93,10 @@ public class OpenJobActionTests extends ESTestCase { OpenJobAction.validate("job_id", mlBuilder.build()); } - public void testSelectLeastLoadedMlNode() { + public void testSelectLeastLoadedMlNode_byCount() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + // MachineLearning.MACHINE_MEMORY_NODE_ATTR not set, so this will fall back to allocating by count DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), nodeAttr, Collections.emptySet(), Version.CURRENT)) @@ -117,10 +120,49 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, 30, logger); + assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } + public void testSelectLeastLoadedMlNode_byMemory() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "16000000000"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", JobState.fromString("opened"), tasksBuilder); + addJobTask("job_id2", "_node_id2", JobState.fromString("opened"), tasksBuilder); + addJobTask("job_id3", "_node_id2", JobState.fromString("opened"), tasksBuilder); + addJobTask("job_id4", "_node_id3", JobState.fromString("opened"), tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, jobId -> { + // remember we add 100MB for the process overhead, so these model memory + // limits correspond to estimated footprints of 102MB and 205MB + long jobSize = (jobId.equals("job_id2") || jobId.equals("job_id3")) ? 2 : 105; + return BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(jobSize, ByteSizeUnit.MB)).build(new Date()); + }, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5"); + cs.nodes(nodes); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + cs.metaData(metaData); + cs.routingTable(routingTable.build()); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id5", cs.build(), 2, 10, 30, logger); + assertEquals("", result.getExplanation()); + assertEquals("_node_id2", result.getExecutorNode()); + } + public void testSelectLeastLoadedMlNode_maxCapacity() { int numNodes = randomIntBetween(1, 10); int maxRunningJobsPerNode = randomIntBetween(1, 100); @@ -129,13 +171,15 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + String[] jobIds = new String[numNodes * maxRunningJobsPerNode]; for (int i = 0; i < numNodes; i++) { String nodeId = "_node_id" + i; TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); for (int j = 0; j < maxRunningJobsPerNode; j++) { - long id = j + (maxRunningJobsPerNode * i); - addJobTask("job_id" + id, nodeId, JobState.OPENED, tasksBuilder); + int id = j + (maxRunningJobsPerNode * i); + jobIds[id] = "job_id" + id; + addJobTask(jobIds[id], nodeId, JobState.OPENED, tasksBuilder); } } PersistentTasksCustomMetaData tasks = tasksBuilder.build(); @@ -143,12 +187,12 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "job_id1", "job_id2"); + addJobAndIndices(metaData, routingTable, jobIds); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, 30, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -174,7 +218,7 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, 30, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -209,7 +253,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder.metaData(metaData); ClusterState cs = csBuilder.build(); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, 30, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -219,7 +263,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -230,7 +274,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -241,7 +285,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -276,7 +320,7 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, 30, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -303,7 +347,7 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, 30, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of version [" + Version.CURRENT + "]")); assertNull(result.getExecutorNode()); } @@ -402,7 +446,7 @@ public class OpenJobActionTests extends ESTestCase { } public void testMappingRequiresUpdateSomeVersionMix() throws IOException { - Map versionMix = new HashMap(); + Map versionMix = new HashMap<>(); versionMix.put("version_54", Version.V_5_4_0); versionMix.put("version_current", Version.CURRENT); versionMix.put("version_null", null); @@ -425,7 +469,8 @@ public class OpenJobActionTests extends ESTestCase { } private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { - addJobAndIndices(metaData, routingTable, jobId -> BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date()), jobIds); + addJobAndIndices(metaData, routingTable, jobId -> + BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()), jobIds); } private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, Function jobCreator, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 685f28bb646..09253aac823 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -102,7 +102,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { jobProvider = new JobProvider(client(), builder.build()); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, new NoOpRenormalizer(), - new JobResultsPersister(nodeSettings(), client()), new ModelSizeStats.Builder(JOB_ID).build()) { + new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { @Override protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 9eb70508e91..7f73067cae4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -12,6 +12,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -44,19 +46,19 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.hasEntry; + public class BasicDistributedJobsIT extends BaseMlIntegTestCase { public void testFailOverBasics() throws Exception { internalCluster().ensureAtLeastNumDataNodes(4); ensureStableCluster(4); - Job.Builder job = createJob("fail-over-basics-job"); + Job.Builder job = createJob("fail-over-basics-job", new ByteSizeValue(2, ByteSizeUnit.MB)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); assertTrue(putJobResponse.isAcknowledged()); @@ -200,7 +202,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { ensureStableCluster(3); String jobId = "dedicated-ml-node-job"; - Job.Builder job = createJob(jobId); + Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.MB)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); assertTrue(putJobResponse.isAcknowledged()); @@ -213,10 +215,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { PersistentTask task = tasks.getTask(MlMetadata.jobTaskId(jobId)); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); - Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); - expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); - assertEquals(expectedNodeAttr, node.getAttributes()); + assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); + assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10")); JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); assertNotNull(jobTaskStatus); assertEquals(JobState.OPENED, jobTaskStatus.getState()); @@ -284,7 +284,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { int numJobs = numMlNodes * 10; for (int i = 0; i < numJobs; i++) { - Job.Builder job = createJob(Integer.toString(i)); + Job.Builder job = createJob(Integer.toString(i), new ByteSizeValue(2, ByteSizeUnit.MB)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); assertTrue(putJobResponse.isAcknowledged()); @@ -401,10 +401,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertNotNull(task.getExecutorNode()); assertFalse(task.needsReassignment(clusterState.nodes())); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); - Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); - expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); - assertEquals(expectedNodeAttr, node.getAttributes()); + assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); + assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10")); JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); assertNotNull(jobTaskStatus); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index cc95031a9cc..d631e79c7fb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -6,10 +6,12 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.junit.After; @@ -66,6 +68,13 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { // This is the key assertion: if renormalization never happened then the record_score would // be the same as the initial_record_score on the anomaly record that happened earlier assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore())); + + // Since this job ran for 50 buckets, it's a good place to assert + // that established model memory matches model memory in the job stats + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index d97eb709551..ea2dbb4df2a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; @@ -20,6 +21,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.junit.After; import java.util.ArrayList; @@ -85,6 +87,13 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { }, 60, TimeUnit.SECONDS); waitUntilJobIsClosed(job.getId()); + + // Since this job ran for 168 buckets, it's a good place to assert + // that established model memory matches model memory in the job stats + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); } public void testRealtime() throws Exception { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index 503f31548be..4ee1ad9523b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.nullValue; public class EstablishedMemUsageIT extends BaseMlIntegTestCase { @@ -42,7 +41,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { initClusterAndJob(jobId); - assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); } public void testEstablishedMem_givenNoStatsLongHistory() throws Exception { @@ -53,7 +52,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 25); jobResultsPersister.commitResultWrites(jobId); - assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); } public void testEstablishedMem_givenNoStatsShortHistory() throws Exception { @@ -64,7 +63,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 5); jobResultsPersister.commitResultWrites(jobId); - assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); } public void testEstablishedMem_givenHistoryTooShort() throws Exception { @@ -74,10 +73,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 19); createModelSizeStats(jobId, 1, 19000L); - createModelSizeStats(jobId, 10, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L); jobResultsPersister.commitResultWrites(jobId); - assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); + assertThat(queryEstablishedMemoryUsage(jobId, 19, latestModelSizeStats), equalTo(0L)); } public void testEstablishedMem_givenHistoryJustEnoughLowVariation() throws Exception { @@ -87,10 +87,25 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 20); createModelSizeStats(jobId, 1, 19000L); - createModelSizeStats(jobId, 10, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L); jobResultsPersister.commitResultWrites(jobId); assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(20000L)); + } + + public void testEstablishedMem_givenHistoryJustEnoughAndUninitialized() throws Exception { + String jobId = "just-enough-low-cv-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 20); + createModelSizeStats(jobId, 1, 0L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 0L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); + assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(0L)); } public void testEstablishedMem_givenHistoryJustEnoughHighVariation() throws Exception { @@ -100,10 +115,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 20); createModelSizeStats(jobId, 1, 1000L); - createModelSizeStats(jobId, 10, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L); jobResultsPersister.commitResultWrites(jobId); - assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); + assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(0L)); } public void testEstablishedMem_givenLongEstablished() throws Exception { @@ -113,10 +129,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 25); createModelSizeStats(jobId, 1, 10000L); - createModelSizeStats(jobId, 2, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 2, 20000L); jobResultsPersister.commitResultWrites(jobId); assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L)); } public void testEstablishedMem_givenOneRecentChange() throws Exception { @@ -126,10 +143,24 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createBuckets(jobId, 25); createModelSizeStats(jobId, 1, 10000L); - createModelSizeStats(jobId, 10, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L); jobResultsPersister.commitResultWrites(jobId); assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L)); + } + + public void testEstablishedMem_givenOneRecentChangeOnlyAndUninitialized() throws Exception { + String jobId = "one-recent-change-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 25); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 0L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); + assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(0L)); } public void testEstablishedMem_givenOneRecentChangeOnly() throws Exception { @@ -138,10 +169,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { initClusterAndJob(jobId); createBuckets(jobId, 25); - createModelSizeStats(jobId, 10, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L); jobResultsPersister.commitResultWrites(jobId); assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L)); } public void testEstablishedMem_givenHistoricHighVariationRecentLowVariation() throws Exception { @@ -155,10 +187,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createModelSizeStats(jobId, 10, 6000L); createModelSizeStats(jobId, 19, 9000L); createModelSizeStats(jobId, 30, 19000L); - createModelSizeStats(jobId, 35, 20000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 35, 20000L); jobResultsPersister.commitResultWrites(jobId); assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + assertThat(queryEstablishedMemoryUsage(jobId, 40, latestModelSizeStats), equalTo(20000L)); } public void testEstablishedMem_givenHistoricLowVariationRecentHighVariation() throws Exception { @@ -172,10 +205,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { createModelSizeStats(jobId, 25, 21000L); createModelSizeStats(jobId, 27, 39000L); createModelSizeStats(jobId, 30, 67000L); - createModelSizeStats(jobId, 35, 95000L); + ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 35, 95000L); jobResultsPersister.commitResultWrites(jobId); - assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L)); + assertThat(queryEstablishedMemoryUsage(jobId, 40, latestModelSizeStats), equalTo(0L)); } private void initClusterAndJob(String jobId) { @@ -197,21 +231,28 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { builder.executeRequest(); } - private void createModelSizeStats(String jobId, int bucketNum, long modelBytes) { - ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId); - modelSizeStats.setTimestamp(new Date(bucketSpan * bucketNum)); - modelSizeStats.setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000))); - modelSizeStats.setModelBytes(modelBytes); - jobResultsPersister.persistModelSizeStats(modelSizeStats.build()); + private ModelSizeStats createModelSizeStats(String jobId, int bucketNum, long modelBytes) { + ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(jobId) + .setTimestamp(new Date(bucketSpan * bucketNum)) + .setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000))) + .setModelBytes(modelBytes).build(); + jobResultsPersister.persistModelSizeStats(modelSizeStats); + return modelSizeStats; } private Long queryEstablishedMemoryUsage(String jobId) throws Exception { + return queryEstablishedMemoryUsage(jobId, null, null); + } + + private Long queryEstablishedMemoryUsage(String jobId, Integer bucketNum, ModelSizeStats latestModelSizeStats) + throws Exception { AtomicReference establishedModelMemoryUsage = new AtomicReference<>(); AtomicReference exception = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.getEstablishedMemoryUsage(jobId, memUse -> { + Date latestBucketTimestamp = (bucketNum != null) ? new Date(bucketSpan * bucketNum) : null; + jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, latestModelSizeStats, memUse -> { establishedModelMemoryUsage.set(memUse); latch.countDown(); }, e -> { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java index b100d06db9f..ff1f44fa215 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.FaultDetection; import org.elasticsearch.index.query.QueryBuilders; @@ -54,7 +56,7 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase { internalCluster().ensureAtLeastNumDataNodes(5); ensureStableCluster(5); - Job.Builder job = createJob("relocation-job"); + Job.Builder job = createJob("relocation-job", new ByteSizeValue(2, ByteSizeUnit.MB)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); assertTrue(putJobResponse.isAcknowledged()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java index 3b63a5b1e99..9cb5c8c6e92 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java @@ -7,12 +7,14 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction; import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.junit.After; import java.util.ArrayList; @@ -97,6 +99,13 @@ public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase { GetOverallBucketsAction.INSTANCE, filteredOverallBucketsRequest).actionGet(); assertThat(filteredOverallBucketsResponse.getOverallBuckets().count(), equalTo(2L)); } + + // Since this job ran for 3000 buckets, it's a good place to assert + // that established model memory matches model memory in the job stats + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); } private static Map createRecord(long timestamp) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java index daa035405c2..3ff29e6af12 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java @@ -11,11 +11,13 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.junit.After; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,12 +107,21 @@ public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase { assertThat(getBuckets(splitJob.getId()).size(), equalTo(oneGoBuckets.size())); assertThat(getRecords(oneGoJob.getId()).isEmpty(), is(true)); assertThat(getRecords(splitJob.getId()).isEmpty(), is(true)); + + // Since these jobs ran for 72 buckets, it's a good place to assert + // that established model memory matches model memory in the job stats + for (Job.Builder job : Arrays.asList(oneGoJob, splitJob)) { + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + } } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { Detector.Builder detector = new Detector.Builder("mean", "value"); detector.setByFieldName("by_field"); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); Job.Builder job = new Job.Builder(jobId); job.setAnalysisConfig(analysisConfig); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 3691cdf903e..130952d9606 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -9,7 +9,11 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; @@ -28,7 +32,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { startMlCluster(1, 1); // create and open first job, which succeeds: - Job.Builder job = createJob("close-failed-job-1"); + Job.Builder job = createJob("close-failed-job-1", new ByteSizeValue(2, ByteSizeUnit.MB)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); @@ -40,7 +44,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { }); // create and try to open second job, which fails: - job = createJob("close-failed-job-2"); + job = createJob("close-failed-job-2", new ByteSizeValue(2, ByteSizeUnit.MB)); putJobRequest = new PutJobAction.Request(job); putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); @@ -60,18 +64,23 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { } public void testSingleNode() throws Exception { - verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 32)); + verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100)); } public void testMultipleNodes() throws Exception { - verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 32)); + verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 100)); } private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode) throws Exception { startMlCluster(numNodes, maxNumberOfJobsPerNode); + long maxMlMemoryPerNode = calculateMaxMlMemory(); + ByteSizeValue jobModelMemoryLimit = new ByteSizeValue(2, ByteSizeUnit.MB); + long memoryFootprintPerJob = jobModelMemoryLimit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + long maxJobsPerNodeDueToMemoryLimit = maxMlMemoryPerNode / memoryFootprintPerJob; int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode; + boolean expectMemoryLimitBeforeCountLimit = maxJobsPerNodeDueToMemoryLimit < maxNumberOfJobsPerNode; for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) { - Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i)); + Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i), jobModelMemoryLimit); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); @@ -86,9 +95,19 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { }); logger.info("Opened {}th job", i); } catch (ElasticsearchStatusException e) { - assertTrue(e.getMessage(), e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation")); - assertTrue(e.getMessage(), e.getMessage().endsWith("because this node is full. Number of opened jobs [" + maxNumberOfJobsPerNode + - "], xpack.ml.max_open_jobs [" + maxNumberOfJobsPerNode + "]]")); + assertTrue(e.getMessage(), + e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation")); + if (expectMemoryLimitBeforeCountLimit) { + int expectedJobsAlreadyOpenOnNode = (i - 1) / numNodes; + assertTrue(e.getMessage(), + e.getMessage().endsWith("because this node has insufficient available memory. Available memory for ML [" + + maxMlMemoryPerNode + "], memory required by existing jobs [" + + (expectedJobsAlreadyOpenOnNode * memoryFootprintPerJob) + + "], estimated memory required for this job [" + memoryFootprintPerJob + "]]")); + } else { + assertTrue(e.getMessage(), e.getMessage().endsWith("because this node is full. Number of opened jobs [" + + maxNumberOfJobsPerNode + "], xpack.ml.max_open_jobs [" + maxNumberOfJobsPerNode + "]]")); + } logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i); // close the first job and check if the latest job gets opened: @@ -122,4 +141,9 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { ensureStableCluster(numNodes); } + private long calculateMaxMlMemory() { + Settings settings = internalCluster().getInstance(Settings.class); + return Long.parseLong(internalCluster().getInstance(TransportService.class).getLocalNode().getAttributes() + .get(MachineLearning.MACHINE_MEMORY_NODE_ATTR)) * MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings) / 100; + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index f27cdfd9df4..f28cee0e80e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -431,9 +431,10 @@ public class JobTests extends AbstractSerializingTestCase { public void testBuilder_buildWithCreateTime() { Job.Builder builder = buildJobBuilder("foo"); Date now = new Date(); - Job job = builder.build(now); + Job job = builder.setEstablishedModelMemory(randomNonNegativeLong()).build(now); assertEquals(now, job.getCreateTime()); assertEquals(Version.CURRENT, job.getJobVersion()); + assertNull(job.getEstablishedModelMemory()); } public void testJobWithoutVersion() throws IOException { @@ -516,6 +517,39 @@ public class JobTests extends AbstractSerializingTestCase { assertThat(e.getMessage(), containsString("Invalid group id '$$$'")); } + public void testEstimateMemoryFootprint_GivenEstablished() { + Job.Builder builder = buildJobBuilder("established"); + long establishedModelMemory = randomIntBetween(10_000, 2_000_000_000); + builder.setEstablishedModelMemory(establishedModelMemory); + if (randomBoolean()) { + builder.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), null)); + } + assertEquals(establishedModelMemory + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); + } + + public void testEstimateMemoryFootprint_GivenLimitAndNotEstablished() { + Job.Builder builder = buildJobBuilder("limit"); + if (rarely()) { + // An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory() + // will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0. + builder.setEstablishedModelMemory(0L); + } + ByteSizeValue limit = new ByteSizeValue(randomIntBetween(100, 10000), ByteSizeUnit.MB); + builder.setAnalysisLimits(new AnalysisLimits(limit.getMb(), null)); + assertEquals(limit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint()); + } + + public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() { + Job.Builder builder = buildJobBuilder("nolimit"); + if (rarely()) { + // An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory() + // will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0. + builder.setEstablishedModelMemory(0L); + } + assertEquals(ByteSizeUnit.MB.toBytes(JobUpdate.UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), + builder.build().estimateMemoryFootprint()); + } + public static Job.Builder buildJobBuilder(String id, Date date) { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(date); @@ -566,6 +600,9 @@ public class JobTests extends AbstractSerializingTestCase { if (randomBoolean()) { builder.setLastDataTime(new Date(randomNonNegativeLong())); } + if (randomBoolean()) { + builder.setEstablishedModelMemory(randomNonNegativeLong()); + } builder.setAnalysisConfig(AnalysisConfigTests.createRandomized()); builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java index d8957fc57bd..fa03921f282 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java @@ -85,6 +85,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase { if (randomBoolean()) { update.setModelSnapshotId(randomAlphaOfLength(10)); } + if (randomBoolean()) { + update.setEstablishedModelMemory(randomNonNegativeLong()); + } return update.build(); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 0b010112eba..c1a8066d408 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; @@ -33,11 +35,13 @@ import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -55,6 +59,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private Client client; private Renormalizer renormalizer; private JobResultsPersister persister; + private JobProvider jobProvider; private FlushListener flushListener; private AutoDetectResultProcessor processorUnderTest; @@ -63,12 +68,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase { client = mock(Client.class); renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); + jobProvider = mock(JobProvider.class); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, - new ModelSizeStats.Builder(JOB_ID).build(), flushListener); + processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, jobProvider, + new ModelSizeStats.Builder(JOB_ID).build(), false, flushListener); } public void testProcess() throws TimeoutException { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutodetectResult autodetectResult = mock(AutodetectResult.class); @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); @@ -259,6 +267,36 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(persister, times(1)).persistModelSizeStats(modelSizeStats); verifyNoMoreInteractions(persister); + // No interactions with the jobProvider confirms that the established memory calculation did not run + verifyNoMoreInteractions(jobProvider); + assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); + } + + public void testProcessResult_modelSizeStatsAfterManyBuckets() { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + context.deleteInterimRequired = false; + for (int i = 0; i < JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { + AutodetectResult result = mock(AutodetectResult.class); + Bucket bucket = mock(Bucket.class); + when(result.getBucket()).thenReturn(bucket); + processorUnderTest.processResult(context, result); + } + + AutodetectResult result = mock(AutodetectResult.class); + ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + + verify(persister, times(1)).persistModelSizeStats(modelSizeStats); + verify(persister, times(1)).commitResultWrites(JOB_ID); + verifyNoMoreInteractions(persister); + verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), isNull(Date.class), eq(modelSizeStats), + any(Consumer.class), any(Consumer.class)); + verifyNoMoreInteractions(jobProvider); assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); } @@ -273,12 +311,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getModelSnapshot()).thenReturn(modelSnapshot); processorUnderTest.processResult(context, result); - verify(persister, times(1)).persistModelSnapshot(modelSnapshot); + verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); UpdateJobAction.Request expectedJobUpdateRequest = new UpdateJobAction.Request(JOB_ID, new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); - verify(persister).commitResultWrites(JOB_ID); verifyNoMoreInteractions(persister); } @@ -301,6 +338,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase { } public void testAwaitCompletion() throws TimeoutException { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutodetectResult autodetectResult = mock(AutodetectResult.class); @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); @@ -316,6 +355,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase { } public void testPersisterThrowingDoesntBlockProcessing() { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutodetectResult autodetectResult = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = mock(ModelSnapshot.class); when(autodetectResult.getModelSnapshot()).thenReturn(modelSnapshot); @@ -329,10 +370,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(process.isProcessAliveAfterWaiting()).thenReturn(true); when(process.readAutodetectResults()).thenReturn(iterator); - doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any()); + doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any()); processorUnderTest.process(process); - verify(persister, times(2)).persistModelSnapshot(any()); + verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE)); } public void testParsingErrorSetsFailed() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 1e35ea10179..c79e30de2db 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.indices.recovery.RecoveryState; @@ -36,6 +37,7 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; @@ -118,6 +120,10 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } protected Job.Builder createJob(String id) { + return createJob(id, null); + } + + protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH_MS); @@ -127,13 +133,19 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { Job.Builder builder = new Job.Builder(); builder.setId(id); - + if (modelMemoryLimit != null) { + builder.setAnalysisLimits(new AnalysisLimits(modelMemoryLimit.getMb(), null)); + } builder.setAnalysisConfig(analysisConfig); builder.setDataDescription(dataDescription); return builder; } public static Job.Builder createFareQuoteJob(String id) { + return createFareQuoteJob(id, null); + } + + public static Job.Builder createFareQuoteJob(String id, ByteSizeValue modelMemoryLimit) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH); @@ -146,6 +158,9 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { Job.Builder builder = new Job.Builder(); builder.setId(id); + if (modelMemoryLimit != null) { + builder.setAnalysisLimits(new AnalysisLimits(modelMemoryLimit.getMb(), null)); + } builder.setAnalysisConfig(analysisConfig); builder.setDataDescription(dataDescription); return builder; From 941c0a570138579e4b4552a018fb4d487c900ed1 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 21 Nov 2017 11:42:39 +0100 Subject: [PATCH 2/2] Cross Cluster Search: optionally skip disconnected clusters (elastic/x-pack-elasticsearch#2823) Original commit: elastic/x-pack-elasticsearch@3b0017df1f8a33029355ee0f70b678fceba38609 --- .../security/InternalClientIntegTests.java | 3 ++- .../xpack/watcher/WatcherServiceTests.java | 5 +++-- .../condition/CompareConditionSearchTests.java | 3 ++- .../condition/ScriptConditionTests.java | 18 ++++++++++++------ .../execution/TriggeredWatchStoreTests.java | 4 ++-- .../test/integration/SearchInputTests.java | 6 +++--- 6 files changed, 24 insertions(+), 15 deletions(-) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java index 660c7a29134..2c5418308b9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/InternalClientIntegTests.java @@ -75,7 +75,8 @@ public class InternalClientIntegTests extends ESSingleNodeTestCase { String scrollId = randomAlphaOfLength(5); SearchHit[] hits = new SearchHit[] {new SearchHit(1)}; InternalSearchResponse internalResponse = new InternalSearchResponse(new SearchHits(hits, 1, 1), null, null, null, false, false, 1); - SearchResponse response = new SearchResponse(internalResponse, scrollId, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY); + SearchResponse response = new SearchResponse(internalResponse, scrollId, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); Answer returnResponse = invocation -> { @SuppressWarnings("unchecked") diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 82233f46670..631d0fca24e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -140,7 +140,7 @@ public class WatcherServiceTests extends ESTestCase { // empty scroll response, no further scrolling needed SearchResponseSections scrollSearchSections = new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 1); SearchResponse scrollSearchResponse = new SearchResponse(scrollSearchSections, "scrollId", 1, 1, 0, 10, - ShardSearchFailure.EMPTY_ARRAY); + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); // one search response containing active and inactive watches int count = randomIntBetween(2, 200); @@ -166,7 +166,8 @@ public class WatcherServiceTests extends ESTestCase { } SearchHits searchHits = new SearchHits(hits, count, 1.0f); SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1); - SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY); + SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); // we do need to to use this kind of mocking because of the internal client, which calls doExecute at the end on the supplied // client instance diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/CompareConditionSearchTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/CompareConditionSearchTests.java index 0b97f11d9f2..dbd028d479a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/CompareConditionSearchTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/CompareConditionSearchTests.java @@ -84,7 +84,8 @@ public class CompareConditionSearchTests extends AbstractWatcherIntegrationTestC InternalSearchResponse internalSearchResponse = new InternalSearchResponse( new SearchHits(new SearchHit[]{hit}, 1L, 1f), null, null, null, false, false, 1); - SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_watch_name", new Payload.XContent(response)); assertThat(condition.execute(ctx).met(), is(true)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/ScriptConditionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/ScriptConditionTests.java index 4f3fc2a31a8..64df52ca37e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/ScriptConditionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/condition/ScriptConditionTests.java @@ -94,7 +94,8 @@ public class ScriptConditionTests extends ESTestCase { public void testExecute() throws Exception { ScriptCondition condition = new ScriptCondition(mockScript("ctx.payload.hits.total > 1"), scriptService); - SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response)); assertFalse(condition.execute(ctx).met()); } @@ -102,7 +103,8 @@ public class ScriptConditionTests extends ESTestCase { public void testExecuteMergedParams() throws Exception { Script script = new Script(ScriptType.INLINE, "mockscript", "ctx.payload.hits.total > threshold", singletonMap("threshold", 1)); ScriptCondition executable = new ScriptCondition(script, scriptService); - SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response)); assertFalse(executable.execute(ctx).met()); } @@ -115,7 +117,8 @@ public class ScriptConditionTests extends ESTestCase { parser.nextToken(); ScriptCondition executable = ScriptCondition.parse(scriptService, "_watch", parser); - SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response)); assertFalse(executable.execute(ctx).met()); @@ -179,7 +182,8 @@ public class ScriptConditionTests extends ESTestCase { public void testScriptConditionThrowException() throws Exception { ScriptCondition condition = new ScriptCondition( mockScript("null.foo"), scriptService); - SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response)); ScriptException exception = expectThrows(ScriptException.class, () -> condition.execute(ctx)); assertThat(exception.getMessage(), containsString("Error evaluating null.foo")); @@ -187,7 +191,8 @@ public class ScriptConditionTests extends ESTestCase { public void testScriptConditionReturnObjectThrowsException() throws Exception { ScriptCondition condition = new ScriptCondition(mockScript("return new Object()"), scriptService); - SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response)); Exception exception = expectThrows(IllegalStateException.class, () -> condition.execute(ctx)); assertThat(exception.getMessage(), @@ -197,7 +202,8 @@ public class ScriptConditionTests extends ESTestCase { public void testScriptConditionAccessCtx() throws Exception { ScriptCondition condition = new ScriptCondition(mockScript("ctx.trigger.scheduled_time.getMillis() < new Date().time"), scriptService); - SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]); + SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); WatchExecutionContext ctx = mockExecutionContext("_name", new DateTime(DateTimeZone.UTC), new Payload.XContent(response)); Thread.sleep(10); assertThat(condition.execute(ctx).met(), is(true)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 15329f01a65..76df32f1d2e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -199,8 +199,8 @@ public class TriggeredWatchStoreTests extends ESTestCase { hit.sourceRef(source); hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f); SearchResponse searchResponse2 = new SearchResponse( - new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 0, 1, null); - SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null); + new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 0, 1, null, null); + SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null, null); doAnswer(invocation -> { SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[0]; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java index 2ea2f0e8732..9c68238cabf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java @@ -81,7 +81,7 @@ public class SearchInputTests extends ESTestCase { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SearchRequest.class); PlainActionFuture searchFuture = PlainActionFuture.newFuture(); SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), "", 1, 1, 0, 1234, - ShardSearchFailure.EMPTY_ARRAY); + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); searchFuture.onResponse(searchResponse); when(client.search(requestCaptor.capture())).thenReturn(searchFuture); @@ -104,7 +104,7 @@ public class SearchInputTests extends ESTestCase { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SearchRequest.class); PlainActionFuture searchFuture = PlainActionFuture.newFuture(); SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), "", 1, 1, 0, 1234, - ShardSearchFailure.EMPTY_ARRAY); + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); searchFuture.onResponse(searchResponse); when(client.search(requestCaptor.capture())).thenReturn(searchFuture); @@ -146,7 +146,7 @@ public class SearchInputTests extends ESTestCase { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SearchRequest.class); PlainActionFuture searchFuture = PlainActionFuture.newFuture(); SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), "", 1, 1, 0, 1234, - ShardSearchFailure.EMPTY_ARRAY); + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); searchFuture.onResponse(searchResponse); when(client.search(requestCaptor.capture())).thenReturn(searchFuture);