Merge branch 'master' into feature/sql
Original commit: elastic/x-pack-elasticsearch@d11ddc7a2c
This commit is contained in:
commit
61f13b9642
|
@ -40,6 +40,13 @@ default behavior.
|
|||
`xpack.ml.max_open_jobs`::
|
||||
The maximum number of jobs that can run on a node. Defaults to `10`.
|
||||
|
||||
`xpack.ml.max_machine_memory_percent`::
|
||||
The maximum percentage of the machine's memory that {ml} may use for running
|
||||
analytics processes. (These processes are separate to the {es} JVM.) Defaults to
|
||||
`30` percent. The limit is based on the total memory of the machine, not current
|
||||
free memory. Jobs will not be allocated to a node if doing so would cause the
|
||||
estimated memory use of {ml} jobs to exceed the limit.
|
||||
|
||||
`xpack.ml.max_model_memory_limit`::
|
||||
The maximum `model_memory_limit` property value that can be set for any job on
|
||||
this node. If you try to create a job with a `model_memory_limit` property value
|
||||
|
|
|
@ -23,11 +23,14 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.monitor.os.OsProbe;
|
||||
import org.elasticsearch.monitor.os.OsStats;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
|
@ -138,6 +141,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
|||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -161,10 +165,13 @@ public class MachineLearning implements ActionPlugin {
|
|||
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Property.NodeScope);
|
||||
public static final String ML_ENABLED_NODE_ATTR = "ml.enabled";
|
||||
public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
|
||||
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
|
||||
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
|
||||
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
|
||||
public static final Setting<ByteSizeValue> MAX_MODEL_MEMORY_LIMIT =
|
||||
Setting.memorySizeSetting("xpack.ml.max_model_memory_limit", new ByteSizeValue(0), Property.Dynamic, Property.NodeScope);
|
||||
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
|
||||
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
public static final TimeValue STATE_PERSIST_RESTORE_TIMEOUT = TimeValue.timeValueMinutes(30);
|
||||
|
||||
|
@ -195,6 +202,7 @@ public class MachineLearning implements ActionPlugin {
|
|||
ML_ENABLED,
|
||||
CONCURRENT_JOB_ALLOCATIONS,
|
||||
MAX_MODEL_MEMORY_LIMIT,
|
||||
MAX_MACHINE_MEMORY_PERCENT,
|
||||
ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING,
|
||||
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
|
||||
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
|
||||
|
@ -206,9 +214,10 @@ public class MachineLearning implements ActionPlugin {
|
|||
public Settings additionalSettings() {
|
||||
String mlEnabledNodeAttrName = "node.attr." + ML_ENABLED_NODE_ATTR;
|
||||
String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR;
|
||||
String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
|
||||
|
||||
if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) {
|
||||
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName);
|
||||
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName);
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
|
@ -219,8 +228,10 @@ public class MachineLearning implements ActionPlugin {
|
|||
addMlNodeAttribute(additionalSettings, mlEnabledNodeAttrName, "true");
|
||||
addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,
|
||||
String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings)));
|
||||
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
|
||||
Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
|
||||
} else {
|
||||
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName);
|
||||
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName);
|
||||
}
|
||||
return additionalSettings.build();
|
||||
}
|
||||
|
@ -504,4 +515,25 @@ public class MachineLearning implements ActionPlugin {
|
|||
maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool");
|
||||
return Arrays.asList(autoDetect, renormalizer, datafeed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the memory size (in bytes) of the machine this node is running on.
|
||||
* Takes container limits (as used by Docker for example) into account.
|
||||
*/
|
||||
static long machineMemoryFromStats(OsStats stats) {
|
||||
long mem = stats.getMem().getTotal().getBytes();
|
||||
OsStats.Cgroup cgroup = stats.getCgroup();
|
||||
if (cgroup != null) {
|
||||
String containerLimitStr = cgroup.getMemoryLimitInBytes();
|
||||
if (containerLimitStr != null) {
|
||||
BigInteger containerLimit = new BigInteger(containerLimitStr);
|
||||
if ((containerLimit.compareTo(BigInteger.valueOf(mem)) < 0 && containerLimit.compareTo(BigInteger.ZERO) > 0)
|
||||
// mem < 0 means the value couldn't be obtained for some reason
|
||||
|| (mem < 0 && containerLimit.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) < 0)) {
|
||||
mem = containerLimit.longValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
return mem;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.elasticsearch.xpack.security.InternalClient;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -573,6 +574,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
*/
|
||||
private final int fallbackMaxNumberOfOpenJobs;
|
||||
private volatile int maxConcurrentJobAllocations;
|
||||
private volatile int maxMachineMemoryPercent;
|
||||
|
||||
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
|
||||
AutodetectProcessManager autodetectProcessManager) {
|
||||
|
@ -580,14 +582,17 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
this.autodetectProcessManager = autodetectProcessManager;
|
||||
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
|
||||
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
|
||||
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
|
||||
clusterService.getClusterSettings()
|
||||
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
|
||||
clusterService.getClusterSettings()
|
||||
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Assignment getAssignment(JobParams params, ClusterState clusterState) {
|
||||
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs,
|
||||
logger);
|
||||
maxMachineMemoryPercent, logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -597,7 +602,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
OpenJobAction.validate(params.getJobId(), mlMetadata);
|
||||
Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations,
|
||||
fallbackMaxNumberOfOpenJobs, logger);
|
||||
fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger);
|
||||
if (assignment.getExecutorNode() == null) {
|
||||
String msg = "Could not open job because no suitable nodes were found, allocation explanation ["
|
||||
+ assignment.getExplanation() + "]";
|
||||
|
@ -630,6 +635,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
|
||||
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
|
||||
}
|
||||
|
||||
void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) {
|
||||
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_MACHINE_MEMORY_PERCENT.getKey(),
|
||||
this.maxMachineMemoryPercent, maxMachineMemoryPercent);
|
||||
this.maxMachineMemoryPercent = maxMachineMemoryPercent;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -655,7 +666,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
}
|
||||
|
||||
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
|
||||
int fallbackMaxNumberOfOpenJobs, Logger logger) {
|
||||
int fallbackMaxNumberOfOpenJobs, int maxMachineMemoryPercent, Logger logger) {
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState);
|
||||
if (unavailableIndices.size() != 0) {
|
||||
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
|
||||
|
@ -664,9 +675,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
return new Assignment(null, reason);
|
||||
}
|
||||
|
||||
long maxAvailable = Long.MIN_VALUE;
|
||||
List<String> reasons = new LinkedList<>();
|
||||
DiscoveryNode minLoadedNode = null;
|
||||
long maxAvailableCount = Long.MIN_VALUE;
|
||||
long maxAvailableMemory = Long.MIN_VALUE;
|
||||
DiscoveryNode minLoadedNodeByCount = null;
|
||||
DiscoveryNode minLoadedNodeByMemory = null;
|
||||
// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
|
||||
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
|
||||
boolean allocateByMemory = true;
|
||||
PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
for (DiscoveryNode node : clusterState.getNodes()) {
|
||||
Map<String, String> nodeAttributes = node.getAttributes();
|
||||
|
@ -697,22 +713,26 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
continue;
|
||||
}
|
||||
|
||||
long numberOfAssignedJobs;
|
||||
int numberOfAllocatingJobs;
|
||||
long numberOfAssignedJobs = 0;
|
||||
int numberOfAllocatingJobs = 0;
|
||||
long assignedJobMemory = 0;
|
||||
if (persistentTasks != null) {
|
||||
numberOfAssignedJobs = persistentTasks.getNumberOfTasksOnNode(node.getId(), OpenJobAction.TASK_NAME);
|
||||
numberOfAllocatingJobs = persistentTasks.findTasks(OpenJobAction.TASK_NAME, task -> {
|
||||
if (node.getId().equals(task.getExecutorNode()) == false) {
|
||||
return false;
|
||||
// find all the job tasks assigned to this node
|
||||
Collection<PersistentTask<?>> assignedTasks = persistentTasks.findTasks(OpenJobAction.TASK_NAME,
|
||||
task -> node.getId().equals(task.getExecutorNode()));
|
||||
numberOfAssignedJobs = assignedTasks.size();
|
||||
for (PersistentTask<?> assignedTask : assignedTasks) {
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus();
|
||||
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
|
||||
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
|
||||
jobTaskState.isStatusStale(assignedTask)) {
|
||||
++numberOfAllocatingJobs;
|
||||
}
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
|
||||
return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
|
||||
jobTaskState.isStatusStale(task); // previous executor node failed and
|
||||
// current executor node didn't have the chance to set job status to OPENING
|
||||
}).size();
|
||||
} else {
|
||||
numberOfAssignedJobs = 0;
|
||||
numberOfAllocatingJobs = 0;
|
||||
String assignedJobId = ((JobParams) assignedTask.getParams()).getJobId();
|
||||
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);
|
||||
assert assignedJob != null;
|
||||
assignedJobMemory += assignedJob.estimateMemoryFootprint();
|
||||
}
|
||||
}
|
||||
if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
|
||||
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs +
|
||||
|
@ -736,8 +756,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
continue;
|
||||
}
|
||||
}
|
||||
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
|
||||
if (available == 0) {
|
||||
long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs;
|
||||
if (availableCount == 0) {
|
||||
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +
|
||||
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_OPEN_JOBS_PER_NODE.getKey() +
|
||||
" [" + maxNumberOfOpenJobs + "]";
|
||||
|
@ -746,11 +766,55 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
continue;
|
||||
}
|
||||
|
||||
if (maxAvailable < available) {
|
||||
maxAvailable = available;
|
||||
minLoadedNode = node;
|
||||
if (maxAvailableCount < availableCount) {
|
||||
maxAvailableCount = availableCount;
|
||||
minLoadedNodeByCount = node;
|
||||
}
|
||||
|
||||
String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
|
||||
long machineMemory = -1;
|
||||
// TODO: remove leniency and reject the node if the attribute is null in 7.0
|
||||
if (machineMemoryStr != null) {
|
||||
try {
|
||||
machineMemory = Long.parseLong(machineMemoryStr);
|
||||
} catch (NumberFormatException e) {
|
||||
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because " +
|
||||
MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long";
|
||||
logger.trace(reason);
|
||||
reasons.add(reason);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (allocateByMemory) {
|
||||
if (machineMemory > 0) {
|
||||
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
|
||||
long estimatedMemoryFootprint = job.estimateMemoryFootprint();
|
||||
long availableMemory = maxMlMemory - assignedJobMemory;
|
||||
if (estimatedMemoryFootprint > availableMemory) {
|
||||
String reason = "Not opening job [" + jobId + "] on node [" + node +
|
||||
"], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory +
|
||||
"], memory required by existing jobs [" + assignedJobMemory +
|
||||
"], estimated memory required for this job [" + estimatedMemoryFootprint + "]";
|
||||
logger.trace(reason);
|
||||
reasons.add(reason);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (maxAvailableMemory < availableMemory) {
|
||||
maxAvailableMemory = availableMemory;
|
||||
minLoadedNodeByMemory = node;
|
||||
}
|
||||
} else {
|
||||
// If we cannot get the available memory on any machine in
|
||||
// the cluster, fall back to simply allocating by job count
|
||||
allocateByMemory = false;
|
||||
logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
|
||||
jobId, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
DiscoveryNode minLoadedNode = allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount;
|
||||
if (minLoadedNode != null) {
|
||||
logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId);
|
||||
return new Assignment(minLoadedNode.getId(), "");
|
||||
|
|
|
@ -19,6 +19,9 @@ import org.elasticsearch.common.CheckedConsumer;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
|
||||
|
@ -38,6 +41,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
|
|||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
@ -245,6 +249,7 @@ public class JobManager extends AbstractComponent {
|
|||
clusterService.submitStateUpdateTask("update-job-" + jobId,
|
||||
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
|
||||
private volatile Job updatedJob;
|
||||
private volatile boolean changeWasRequired;
|
||||
|
||||
@Override
|
||||
protected PutJobAction.Response newResponse(boolean acknowledged) {
|
||||
|
@ -255,16 +260,33 @@ public class JobManager extends AbstractComponent {
|
|||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
Job job = getJobOrThrowIfUnknown(jobId, currentState);
|
||||
updatedJob = jobUpdate.mergeWithJob(job, maxModelMemoryLimit);
|
||||
if (updatedJob.equals(job)) {
|
||||
// nothing to do
|
||||
return currentState;
|
||||
}
|
||||
changeWasRequired = true;
|
||||
return updateClusterState(updatedJob, true, currentState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
PersistentTasksCustomMetaData persistentTasks =
|
||||
newState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
JobState jobState = MlMetadata.getJobState(jobId, persistentTasks);
|
||||
if (jobState == JobState.OPENED) {
|
||||
updateJobProcessNotifier.submitJobUpdate(jobUpdate);
|
||||
if (changeWasRequired) {
|
||||
PersistentTasksCustomMetaData persistentTasks =
|
||||
newState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
JobState jobState = MlMetadata.getJobState(jobId, persistentTasks);
|
||||
if (jobState == JobState.OPENED) {
|
||||
updateJobProcessNotifier.submitJobUpdate(jobUpdate);
|
||||
}
|
||||
} else {
|
||||
logger.debug("[{}] Ignored job update with no changes: {}", () -> jobId, () -> {
|
||||
try {
|
||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||
jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||
return jsonBuilder.string();
|
||||
} catch (IOException e) {
|
||||
return "(unprintable due to " + e.getMessage() + ")";
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -330,9 +352,10 @@ public class JobManager extends AbstractComponent {
|
|||
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,
|
||||
ModelSnapshot modelSnapshot) {
|
||||
|
||||
final ModelSizeStats modelSizeStats = modelSnapshot.getModelSizeStats();
|
||||
final JobResultsPersister persister = new JobResultsPersister(settings, client);
|
||||
|
||||
// Step 2. After the model size stats is persisted, also persist the snapshot's quantiles and respond
|
||||
// Step 3. After the model size stats is persisted, also persist the snapshot's quantiles and respond
|
||||
// -------
|
||||
CheckedConsumer<IndexResponse, Exception> modelSizeStatsResponseHandler = response -> {
|
||||
persister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE,
|
||||
|
@ -344,21 +367,20 @@ public class JobManager extends AbstractComponent {
|
|||
}, actionListener::onFailure));
|
||||
};
|
||||
|
||||
// Step 1. When the model_snapshot_id is updated on the job, persist the snapshot's model size stats with a touched log time
|
||||
// Step 2. When the model_snapshot_id is updated on the job, persist the snapshot's model size stats with a touched log time
|
||||
// so that a search for the latest model size stats returns the reverted one.
|
||||
// -------
|
||||
CheckedConsumer<Boolean, Exception> updateHandler = response -> {
|
||||
if (response) {
|
||||
ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSnapshot.getModelSizeStats())
|
||||
.setLogTime(new Date()).build();
|
||||
ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSizeStats).setLogTime(new Date()).build();
|
||||
persister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(
|
||||
modelSizeStatsResponseHandler::accept, actionListener::onFailure));
|
||||
}
|
||||
};
|
||||
|
||||
// Step 0. Kick off the chain of callbacks with the cluster state update
|
||||
// Step 1. Do the cluster state update
|
||||
// -------
|
||||
clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(),
|
||||
Consumer<Long> clusterStateHandler = response -> clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(),
|
||||
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) {
|
||||
|
||||
@Override
|
||||
|
@ -377,9 +399,15 @@ public class JobManager extends AbstractComponent {
|
|||
Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
|
||||
Job.Builder builder = new Job.Builder(job);
|
||||
builder.setModelSnapshotId(modelSnapshot.getSnapshotId());
|
||||
builder.setEstablishedModelMemory(response);
|
||||
return updateClusterState(builder.build(), true, currentState);
|
||||
}
|
||||
});
|
||||
|
||||
// Step 0. Find the appropriate established model memory for the reverted job
|
||||
// -------
|
||||
jobProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, clusterStateHandler,
|
||||
actionListener::onFailure);
|
||||
}
|
||||
|
||||
private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) {
|
||||
|
|
|
@ -24,18 +24,15 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import static org.elasticsearch.xpack.ml.action.UpdateProcessAction.Request;
|
||||
import static org.elasticsearch.xpack.ml.action.UpdateProcessAction.Response;
|
||||
|
||||
public class UpdateJobProcessNotifier extends AbstractComponent
|
||||
implements LocalNodeMasterListener {
|
||||
public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener {
|
||||
|
||||
private final Client client;
|
||||
private final ThreadPool threadPool;
|
||||
private final LinkedBlockingQueue<JobUpdate> orderedJobUpdates =
|
||||
new LinkedBlockingQueue<>(1000);
|
||||
private final LinkedBlockingQueue<JobUpdate> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
|
||||
|
||||
private volatile ThreadPool.Cancellable cancellable;
|
||||
|
||||
public UpdateJobProcessNotifier(Settings settings, Client client,
|
||||
ClusterService clusterService, ThreadPool threadPool) {
|
||||
public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -62,12 +59,11 @@ public class UpdateJobProcessNotifier extends AbstractComponent
|
|||
stop();
|
||||
}
|
||||
|
||||
void start() {
|
||||
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate,
|
||||
TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
|
||||
private void start() {
|
||||
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
|
||||
}
|
||||
|
||||
void stop() {
|
||||
private void stop() {
|
||||
orderedJobUpdates.clear();
|
||||
|
||||
ThreadPool.Cancellable cancellable = this.cancellable;
|
||||
|
@ -82,20 +78,26 @@ public class UpdateJobProcessNotifier extends AbstractComponent
|
|||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
void processNextUpdate() {
|
||||
private void processNextUpdate() {
|
||||
try {
|
||||
JobUpdate jobUpdate = orderedJobUpdates.poll();
|
||||
if (jobUpdate != null) {
|
||||
executeRemoteJob(jobUpdate);
|
||||
executeRemoteJobIfNecessary(jobUpdate);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable while processing next job update", e);
|
||||
}
|
||||
}
|
||||
|
||||
void executeRemoteJobIfNecessary(JobUpdate update) {
|
||||
// Do nothing if the fields that the C++ needs aren't being updated
|
||||
if (update.isAutodetectProcessUpdate()) {
|
||||
executeRemoteJob(update);
|
||||
}
|
||||
}
|
||||
|
||||
void executeRemoteJob(JobUpdate update) {
|
||||
Request request = new Request(update.getJobId(), update.getModelPlotConfig(),
|
||||
update.getDetectorUpdates());
|
||||
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates());
|
||||
client.execute(UpdateProcessAction.INSTANCE, request,
|
||||
new ActionListener<Response>() {
|
||||
@Override
|
||||
|
@ -110,15 +112,12 @@ public class UpdateJobProcessNotifier extends AbstractComponent
|
|||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
logger.debug("Remote job [{}] not updated as it has been deleted",
|
||||
update.getJobId());
|
||||
} else if (e.getMessage().contains("because job [" + update.getJobId() +
|
||||
"] is not open") && e instanceof ElasticsearchStatusException) {
|
||||
logger.debug("Remote job [{}] not updated as it is no longer open",
|
||||
update.getJobId());
|
||||
logger.debug("Remote job [{}] not updated as it has been deleted", update.getJobId());
|
||||
} else if (e.getMessage().contains("because job [" + update.getJobId() + "] is not open")
|
||||
&& e instanceof ElasticsearchStatusException) {
|
||||
logger.debug("Remote job [{}] not updated as it is no longer open", update.getJobId());
|
||||
} else {
|
||||
logger.error("Failed to update remote job [" + update.getJobId() + "]",
|
||||
e);
|
||||
logger.error("Failed to update remote job [" + update.getJobId() + "]", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -10,7 +10,6 @@ import org.elasticsearch.cluster.AbstractDiffable;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.spi.Message;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -69,6 +68,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
public static final ParseField DESCRIPTION = new ParseField("description");
|
||||
public static final ParseField FINISHED_TIME = new ParseField("finished_time");
|
||||
public static final ParseField LAST_DATA_TIME = new ParseField("last_data_time");
|
||||
public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory");
|
||||
public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config");
|
||||
public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days");
|
||||
public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval");
|
||||
|
@ -88,6 +88,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
|
||||
public static final int MAX_JOB_ID_LENGTH = 64;
|
||||
public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1);
|
||||
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB);
|
||||
|
||||
static {
|
||||
PARSERS.put(MlParserType.METADATA, METADATA_PARSER);
|
||||
|
@ -127,6 +128,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
throw new IllegalArgumentException(
|
||||
"unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]");
|
||||
}, LAST_DATA_TIME, ValueType.VALUE);
|
||||
parser.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY);
|
||||
parser.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSERS.get(parserType), ANALYSIS_CONFIG);
|
||||
parser.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSERS.get(parserType), ANALYSIS_LIMITS);
|
||||
parser.declareObject(Builder::setDataDescription, DataDescription.PARSERS.get(parserType), DATA_DESCRIPTION);
|
||||
|
@ -159,6 +161,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
private final Date createTime;
|
||||
private final Date finishedTime;
|
||||
private final Date lastDataTime;
|
||||
private final Long establishedModelMemory;
|
||||
private final AnalysisConfig analysisConfig;
|
||||
private final AnalysisLimits analysisLimits;
|
||||
private final DataDescription dataDescription;
|
||||
|
@ -173,7 +176,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
private final boolean deleted;
|
||||
|
||||
private Job(String jobId, String jobType, Version jobVersion, List<String> groups, String description, Date createTime,
|
||||
Date finishedTime, Date lastDataTime,
|
||||
Date finishedTime, Date lastDataTime, Long establishedModelMemory,
|
||||
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
|
||||
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
|
||||
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
|
||||
|
@ -187,6 +190,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
this.createTime = createTime;
|
||||
this.finishedTime = finishedTime;
|
||||
this.lastDataTime = lastDataTime;
|
||||
this.establishedModelMemory = establishedModelMemory;
|
||||
this.analysisConfig = analysisConfig;
|
||||
this.analysisLimits = analysisLimits;
|
||||
this.dataDescription = dataDescription;
|
||||
|
@ -218,6 +222,12 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
createTime = new Date(in.readVLong());
|
||||
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
// TODO: set to V_6_1_0 after backporting
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
establishedModelMemory = in.readOptionalLong();
|
||||
} else {
|
||||
establishedModelMemory = null;
|
||||
}
|
||||
analysisConfig = new AnalysisConfig(in);
|
||||
analysisLimits = in.readOptionalWriteable(AnalysisLimits::new);
|
||||
dataDescription = in.readOptionalWriteable(DataDescription::new);
|
||||
|
@ -319,6 +329,16 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
return lastDataTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* The established model memory of the job, or <code>null</code> if model
|
||||
* memory has not reached equilibrium yet.
|
||||
*
|
||||
* @return The established model memory of the job
|
||||
*/
|
||||
public Long getEstablishedModelMemory() {
|
||||
return establishedModelMemory;
|
||||
}
|
||||
|
||||
/**
|
||||
* The analysis configuration object
|
||||
*
|
||||
|
@ -418,6 +438,23 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
return new ArrayList<>(allFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a best estimate of the job's memory footprint using the information available.
|
||||
* If a job has an established model memory size, then this is the best estimate.
|
||||
* Otherwise, assume the maximum model memory limit will eventually be required.
|
||||
* In either case, a fixed overhead is added to account for the memory required by the
|
||||
* program code and stack.
|
||||
* @return an estimate of the memory requirement of this job, in bytes
|
||||
*/
|
||||
public long estimateMemoryFootprint() {
|
||||
if (establishedModelMemory != null && establishedModelMemory > 0) {
|
||||
return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes();
|
||||
}
|
||||
Long modelMemoryLimit = (analysisLimits != null) ? analysisLimits.getModelMemoryLimit() : null;
|
||||
return ByteSizeUnit.MB.toBytes((modelMemoryLimit != null) ? modelMemoryLimit : JobUpdate.UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT)
|
||||
+ PROCESS_MEMORY_OVERHEAD.getBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
|
@ -447,6 +484,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
// TODO: set to V_6_1_0 after backporting
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeOptionalLong(establishedModelMemory);
|
||||
}
|
||||
analysisConfig.writeTo(out);
|
||||
out.writeOptionalWriteable(analysisLimits);
|
||||
out.writeOptionalWriteable(dataDescription);
|
||||
|
@ -492,6 +533,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + humanReadableSuffix,
|
||||
lastDataTime.getTime());
|
||||
}
|
||||
if (establishedModelMemory != null) {
|
||||
builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
|
||||
}
|
||||
builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params);
|
||||
if (analysisLimits != null) {
|
||||
builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params);
|
||||
|
@ -546,6 +590,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
&& Objects.equals(this.createTime, that.createTime)
|
||||
&& Objects.equals(this.finishedTime, that.finishedTime)
|
||||
&& Objects.equals(this.lastDataTime, that.lastDataTime)
|
||||
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
|
||||
&& Objects.equals(this.analysisConfig, that.analysisConfig)
|
||||
&& Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription)
|
||||
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
|
||||
|
@ -561,8 +606,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, analysisConfig,
|
||||
analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
|
||||
return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
|
||||
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
|
||||
modelSnapshotId, resultsIndexName, deleted);
|
||||
}
|
||||
|
@ -605,6 +650,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
private Date createTime;
|
||||
private Date finishedTime;
|
||||
private Date lastDataTime;
|
||||
private Long establishedModelMemory;
|
||||
private ModelPlotConfig modelPlotConfig;
|
||||
private Long renormalizationWindowDays;
|
||||
private TimeValue backgroundPersistInterval;
|
||||
|
@ -634,6 +680,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
this.createTime = job.getCreateTime();
|
||||
this.finishedTime = job.getFinishedTime();
|
||||
this.lastDataTime = job.getLastDataTime();
|
||||
this.establishedModelMemory = job.getEstablishedModelMemory();
|
||||
this.modelPlotConfig = job.getModelPlotConfig();
|
||||
this.renormalizationWindowDays = job.getRenormalizationWindowDays();
|
||||
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
|
||||
|
@ -660,6 +707,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
createTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
// TODO: set to V_6_1_0 after backporting
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
establishedModelMemory = in.readOptionalLong();
|
||||
}
|
||||
analysisConfig = in.readOptionalWriteable(AnalysisConfig::new);
|
||||
analysisLimits = in.readOptionalWriteable(AnalysisLimits::new);
|
||||
dataDescription = in.readOptionalWriteable(DataDescription::new);
|
||||
|
@ -699,10 +750,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
this.groups = groups == null ? Collections.emptyList() : groups;
|
||||
}
|
||||
|
||||
public Date getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public Builder setCustomSettings(Map<String, Object> customSettings) {
|
||||
this.customSettings = customSettings;
|
||||
return this;
|
||||
|
@ -742,6 +789,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setEstablishedModelMemory(Long establishedModelMemory) {
|
||||
this.establishedModelMemory = establishedModelMemory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setDataDescription(DataDescription.Builder description) {
|
||||
dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build();
|
||||
return this;
|
||||
|
@ -844,6 +896,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
// TODO: set to V_6_1_0 after backporting
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeOptionalLong(establishedModelMemory);
|
||||
}
|
||||
out.writeOptionalWriteable(analysisConfig);
|
||||
out.writeOptionalWriteable(analysisLimits);
|
||||
out.writeOptionalWriteable(dataDescription);
|
||||
|
@ -880,6 +936,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
if (lastDataTime != null) {
|
||||
builder.field(LAST_DATA_TIME.getPreferredName(), lastDataTime.getTime());
|
||||
}
|
||||
if (establishedModelMemory != null) {
|
||||
builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
|
||||
}
|
||||
if (analysisConfig != null) {
|
||||
builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params);
|
||||
}
|
||||
|
@ -937,6 +996,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
&& Objects.equals(this.createTime, that.createTime)
|
||||
&& Objects.equals(this.finishedTime, that.finishedTime)
|
||||
&& Objects.equals(this.lastDataTime, that.lastDataTime)
|
||||
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
|
||||
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
|
||||
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
|
||||
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
|
||||
|
@ -951,8 +1011,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime,
|
||||
finishedTime, lastDataTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval,
|
||||
modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted);
|
||||
finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
|
||||
resultsIndexName, deleted);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1048,6 +1109,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
public Job build(Date createTime) {
|
||||
setCreateTime(createTime);
|
||||
setJobVersion(Version.CURRENT);
|
||||
// TODO: Maybe we _could_ accept a value for this supplied at create time - it would
|
||||
// mean cloned jobs that hadn't been edited much would start with an accurate expected size.
|
||||
// But on the other hand it would mean jobs that were cloned and then completely changed
|
||||
// would start with a size that was completely wrong.
|
||||
setEstablishedModelMemory(null);
|
||||
return build();
|
||||
}
|
||||
|
||||
|
@ -1076,7 +1142,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
}
|
||||
|
||||
return new Job(
|
||||
id, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime,
|
||||
id, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
|
||||
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
|
||||
modelSnapshotId, resultsIndexName, deleted);
|
||||
|
|
|
@ -48,6 +48,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
|
||||
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
|
||||
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
|
||||
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -56,7 +57,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
* If model_memory_limit is not defined for a job then the
|
||||
* job was created before 6.1 and a value of 4GB is assumed.
|
||||
*/
|
||||
private static final long UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT = 4096;
|
||||
static final long UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT = 4096;
|
||||
|
||||
private final String jobId;
|
||||
private final List<String> groups;
|
||||
|
@ -71,13 +72,15 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
private final List<String> categorizationFilters;
|
||||
private final Map<String, Object> customSettings;
|
||||
private final String modelSnapshotId;
|
||||
private final Long establishedModelMemory;
|
||||
|
||||
private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
|
||||
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
|
||||
@Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval,
|
||||
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
|
||||
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
|
||||
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId) {
|
||||
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
|
||||
@Nullable Long establishedModelMemory) {
|
||||
this.jobId = jobId;
|
||||
this.groups = groups;
|
||||
this.description = description;
|
||||
|
@ -91,6 +94,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
this.categorizationFilters = categorisationFilters;
|
||||
this.customSettings = customSettings;
|
||||
this.modelSnapshotId = modelSnapshotId;
|
||||
this.establishedModelMemory = establishedModelMemory;
|
||||
}
|
||||
|
||||
public JobUpdate(StreamInput in) throws IOException {
|
||||
|
@ -120,6 +124,12 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
}
|
||||
customSettings = in.readMap();
|
||||
modelSnapshotId = in.readOptionalString();
|
||||
// TODO: set to V_6_1_0 after backporting
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
establishedModelMemory = in.readOptionalLong();
|
||||
} else {
|
||||
establishedModelMemory = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -146,6 +156,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
}
|
||||
out.writeMap(customSettings);
|
||||
out.writeOptionalString(modelSnapshotId);
|
||||
// TODO: set to V_6_1_0 after backporting
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeOptionalLong(establishedModelMemory);
|
||||
}
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
|
@ -200,6 +214,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
return modelSnapshotId;
|
||||
}
|
||||
|
||||
public Long getEstablishedModelMemory() {
|
||||
return establishedModelMemory;
|
||||
}
|
||||
|
||||
public boolean isAutodetectProcessUpdate() {
|
||||
return modelPlotConfig != null || detectorUpdates != null;
|
||||
}
|
||||
|
@ -244,6 +262,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
if (modelSnapshotId != null) {
|
||||
builder.field(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
|
||||
}
|
||||
if (establishedModelMemory != null) {
|
||||
builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -344,7 +365,14 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
if (modelSnapshotId != null) {
|
||||
builder.setModelSnapshotId(modelSnapshotId);
|
||||
}
|
||||
|
||||
if (establishedModelMemory != null) {
|
||||
// An established model memory of zero means we don't actually know the established model memory
|
||||
if (establishedModelMemory > 0) {
|
||||
builder.setEstablishedModelMemory(establishedModelMemory);
|
||||
} else {
|
||||
builder.setEstablishedModelMemory(null);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -372,14 +400,15 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
|
||||
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
|
||||
&& Objects.equals(this.customSettings, that.customSettings)
|
||||
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId);
|
||||
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
|
||||
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
|
||||
modelSnapshotId);
|
||||
modelSnapshotId, establishedModelMemory);
|
||||
}
|
||||
|
||||
public static class DetectorUpdate implements Writeable, ToXContentObject {
|
||||
|
@ -490,6 +519,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
private List<String> categorizationFilters;
|
||||
private Map<String, Object> customSettings;
|
||||
private String modelSnapshotId;
|
||||
private Long establishedModelMemory;
|
||||
|
||||
public Builder(String jobId) {
|
||||
this.jobId = jobId;
|
||||
|
@ -560,10 +590,15 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setEstablishedModelMemory(Long establishedModelMemory) {
|
||||
this.establishedModelMemory = establishedModelMemory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public JobUpdate build() {
|
||||
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
|
||||
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
|
||||
modelSnapshotId);
|
||||
modelSnapshotId, establishedModelMemory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -92,7 +93,7 @@ public class JobProvider {
|
|||
private static final Logger LOGGER = Loggers.getLogger(JobProvider.class);
|
||||
|
||||
private static final int RECORDS_SIZE_PARAM = 10000;
|
||||
private static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
|
||||
public static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
|
||||
private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1;
|
||||
|
||||
private final Client client;
|
||||
|
@ -866,11 +867,16 @@ public class JobProvider {
|
|||
* <code>BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE</code> buckets, which is defined as having a coefficient of variation
|
||||
* of no more than <code>ESTABLISHED_MEMORY_CV_THRESHOLD</code>
|
||||
* @param jobId the id of the job for which established memory usage is required
|
||||
* @param latestBucketTimestamp the latest bucket timestamp to be used for the calculation, if known, otherwise
|
||||
* <code>null</code>, implying the latest bucket that exists in the results index
|
||||
* @param latestModelSizeStats the latest model size stats for the job, if known, otherwise <code>null</code> - supplying
|
||||
* these when available avoids one search
|
||||
* @param handler if the method succeeds, this will be passed the established memory usage (in bytes) of the
|
||||
* specified job, or <code>null</code> if memory usage is not yet established
|
||||
* specified job, or 0 if memory usage is not yet established
|
||||
* @param errorHandler if a problem occurs, the exception will be passed to this handler
|
||||
*/
|
||||
public void getEstablishedMemoryUsage(String jobId, Consumer<Long> handler, Consumer<Exception> errorHandler) {
|
||||
public void getEstablishedMemoryUsage(String jobId, Date latestBucketTimestamp, ModelSizeStats latestModelSizeStats,
|
||||
Consumer<Long> handler, Consumer<Exception> errorHandler) {
|
||||
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
|
||||
|
@ -894,7 +900,7 @@ public class JobProvider {
|
|||
long count = extendedStats.getCount();
|
||||
if (count <= 0) {
|
||||
// model size stats haven't changed in the last N buckets, so the latest (older) ones are established
|
||||
modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats), errorHandler);
|
||||
handleLatestModelSizeStats(jobId, latestModelSizeStats, handler, errorHandler);
|
||||
} else if (count == 1) {
|
||||
// no need to do an extra search in the case of exactly one document being aggregated
|
||||
handler.accept((long) extendedStats.getAvg());
|
||||
|
@ -905,45 +911,46 @@ public class JobProvider {
|
|||
// is there sufficient stability in the latest model size stats readings?
|
||||
if (coefficientOfVaration <= ESTABLISHED_MEMORY_CV_THRESHOLD) {
|
||||
// yes, so return the latest model size as established
|
||||
modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats),
|
||||
errorHandler);
|
||||
handleLatestModelSizeStats(jobId, latestModelSizeStats, handler, errorHandler);
|
||||
} else {
|
||||
// no - we don't have an established model size
|
||||
handler.accept(null);
|
||||
handler.accept(0L);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
handler.accept(null);
|
||||
handler.accept(0L);
|
||||
}
|
||||
}, errorHandler
|
||||
));
|
||||
} else {
|
||||
handler.accept(null);
|
||||
LOGGER.trace("[{}] Insufficient history to calculate established memory use", jobId);
|
||||
handler.accept(0L);
|
||||
}
|
||||
};
|
||||
|
||||
// Step 1. Find the time span of the most recent N bucket results, where N is the number of buckets
|
||||
// required to consider memory usage "established"
|
||||
BucketsQueryBuilder bucketQuery = new BucketsQueryBuilder()
|
||||
.end(latestBucketTimestamp != null ? Long.toString(latestBucketTimestamp.getTime() + 1) : null)
|
||||
.sortField(Result.TIMESTAMP.getPreferredName())
|
||||
.sortDescending(true).from(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE - 1).size(1)
|
||||
.includeInterim(false);
|
||||
bucketsViaInternalClient(jobId, bucketQuery, bucketHandler, e -> {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
handler.accept(null);
|
||||
handler.accept(0L);
|
||||
} else {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* A model size of 0 implies a completely uninitialised model. This method converts 0 to <code>null</code>
|
||||
* before calling a handler.
|
||||
*/
|
||||
private static void handleModelBytesOrNull(Consumer<Long> handler, ModelSizeStats modelSizeStats) {
|
||||
long modelBytes = modelSizeStats.getModelBytes();
|
||||
handler.accept(modelBytes > 0 ? modelBytes : null);
|
||||
private void handleLatestModelSizeStats(String jobId, ModelSizeStats latestModelSizeStats, Consumer<Long> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
if (latestModelSizeStats != null) {
|
||||
handler.accept(latestModelSizeStats.getModelBytes());
|
||||
} else {
|
||||
modelSizeStats(jobId, modelSizeStats -> handler.accept(modelSizeStats.getModelBytes()), errorHandler);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -234,8 +234,9 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
/**
|
||||
* Persist a model snapshot description
|
||||
*/
|
||||
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
|
||||
public void persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) {
|
||||
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot));
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
|
||||
}
|
||||
|
||||
|
@ -247,8 +248,6 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
||||
Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId());
|
||||
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet();
|
||||
// Don't commit as we expect masses of these updates and they're only
|
||||
// for information at the API level
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,8 +260,6 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId());
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener);
|
||||
// Don't commit as we expect masses of these updates and they're only
|
||||
// for information at the API level
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -381,7 +381,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService,
|
||||
() -> setJobState(jobTask, JobState.FAILED));
|
||||
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
|
||||
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
|
||||
client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
|
||||
autodetectParams.modelSnapshot() != null);
|
||||
ExecutorService autodetectWorkerExecutor;
|
||||
try {
|
||||
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
|
@ -15,6 +16,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
|
|||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
|
@ -31,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
|
|||
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -64,31 +67,39 @@ public class AutoDetectResultProcessor {
|
|||
private final String jobId;
|
||||
private final Renormalizer renormalizer;
|
||||
private final JobResultsPersister persister;
|
||||
private final JobProvider jobProvider;
|
||||
private final boolean restoredSnapshot;
|
||||
|
||||
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1);
|
||||
private final FlushListener flushListener;
|
||||
private volatile boolean processKilled;
|
||||
private volatile boolean failed;
|
||||
private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
|
||||
|
||||
/**
|
||||
* New model size stats are read as the process is running
|
||||
*/
|
||||
private volatile ModelSizeStats latestModelSizeStats;
|
||||
private volatile long latestEstablishedModelMemory;
|
||||
private volatile boolean haveNewLatestModelSizeStats;
|
||||
|
||||
public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
|
||||
ModelSizeStats latestModelSizeStats) {
|
||||
this(client, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener());
|
||||
JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
|
||||
this(client, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener());
|
||||
}
|
||||
|
||||
AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
|
||||
ModelSizeStats latestModelSizeStats, FlushListener flushListener) {
|
||||
JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot,
|
||||
FlushListener flushListener) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
this.renormalizer = Objects.requireNonNull(renormalizer);
|
||||
this.persister = Objects.requireNonNull(persister);
|
||||
this.jobProvider = Objects.requireNonNull(jobProvider);
|
||||
this.flushListener = Objects.requireNonNull(flushListener);
|
||||
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
|
||||
this.restoredSnapshot = restoredSnapshot;
|
||||
}
|
||||
|
||||
public void process(AutodetectProcess process) {
|
||||
|
@ -98,14 +109,13 @@ public class AutoDetectResultProcessor {
|
|||
// to kill the results reader thread as autodetect will be blocked
|
||||
// trying to write its output.
|
||||
try {
|
||||
int bucketCount = 0;
|
||||
bucketCount = 0;
|
||||
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
|
||||
while (iterator.hasNext()) {
|
||||
try {
|
||||
AutodetectResult result = iterator.next();
|
||||
processResult(context, result);
|
||||
if (result.getBucket() != null) {
|
||||
bucketCount++;
|
||||
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -174,6 +184,17 @@ public class AutoDetectResultProcessor {
|
|||
// persist after deleting interim results in case the new
|
||||
// results are also interim
|
||||
context.bulkResultsPersister.persistBucket(bucket).executeRequest();
|
||||
++bucketCount;
|
||||
|
||||
// if we haven't previously set established model memory, consider trying again after
|
||||
// a reasonable amount of time has elapsed since the last model size stats update
|
||||
long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L;
|
||||
if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0
|
||||
&& bucket.getTimestamp().getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
|
||||
persister.commitResultWrites(context.jobId);
|
||||
updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), latestModelSizeStats);
|
||||
haveNewLatestModelSizeStats = false;
|
||||
}
|
||||
}
|
||||
List<AnomalyRecord> records = result.getRecords();
|
||||
if (records != null && !records.isEmpty()) {
|
||||
|
@ -218,14 +239,21 @@ public class AutoDetectResultProcessor {
|
|||
modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus());
|
||||
|
||||
latestModelSizeStats = modelSizeStats;
|
||||
haveNewLatestModelSizeStats = true;
|
||||
persister.persistModelSizeStats(modelSizeStats);
|
||||
// This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
|
||||
// because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
|
||||
// we'll NEVER consider memory usage to be established during this period
|
||||
if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
|
||||
// We need to make all results written up to and including these stats available for the established memory calculation
|
||||
persister.commitResultWrites(context.jobId);
|
||||
updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats);
|
||||
}
|
||||
}
|
||||
ModelSnapshot modelSnapshot = result.getModelSnapshot();
|
||||
if (modelSnapshot != null) {
|
||||
persister.persistModelSnapshot(modelSnapshot);
|
||||
// We need to refresh the index in order for the snapshot to be available when we'll try to
|
||||
// update the job with it
|
||||
persister.commitResultWrites(jobId);
|
||||
// We need to refresh in order for the snapshot to be available when we try to update the job with it
|
||||
persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
updateModelSnapshotIdOnJob(modelSnapshot);
|
||||
}
|
||||
Quantiles quantiles = result.getQuantiles();
|
||||
|
@ -286,6 +314,28 @@ public class AutoDetectResultProcessor {
|
|||
});
|
||||
}
|
||||
|
||||
protected void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
|
||||
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
|
||||
JobUpdate update = new JobUpdate.Builder(jobId)
|
||||
.setEstablishedModelMemory(establishedModelMemory).build();
|
||||
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update);
|
||||
|
||||
client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(PutJobAction.Response response) {
|
||||
latestEstablishedModelMemory = establishedModelMemory;
|
||||
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]",
|
||||
e);
|
||||
}
|
||||
});
|
||||
}, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e));
|
||||
}
|
||||
|
||||
public void awaitCompletion() throws TimeoutException {
|
||||
try {
|
||||
// Although the results won't take 30 minutes to finish, the pipe won't be closed
|
||||
|
|
|
@ -8,11 +8,15 @@ package org.elasticsearch.xpack.ml;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.monitor.os.OsStats;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MachineLearningTests extends ESTestCase {
|
||||
|
||||
|
@ -67,6 +71,42 @@ public class MachineLearningTests extends ESTestCase {
|
|||
"it is reserved for machine learning. If your intention was to customize machine learning, set the [xpack.ml."));
|
||||
}
|
||||
|
||||
public void testMachineMemory_givenStatsFailure() throws IOException {
|
||||
OsStats stats = mock(OsStats.class);
|
||||
when(stats.getMem()).thenReturn(new OsStats.Mem(-1, -1));
|
||||
assertEquals(-1L, MachineLearning.machineMemoryFromStats(stats));
|
||||
}
|
||||
|
||||
public void testMachineMemory_givenNoCgroup() throws IOException {
|
||||
OsStats stats = mock(OsStats.class);
|
||||
when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L));
|
||||
assertEquals(10_737_418_240L, MachineLearning.machineMemoryFromStats(stats));
|
||||
}
|
||||
|
||||
public void testMachineMemory_givenCgroupNullLimit() throws IOException {
|
||||
OsStats stats = mock(OsStats.class);
|
||||
when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L));
|
||||
when(stats.getCgroup()).thenReturn(new OsStats.Cgroup("a", 1, "b", 2, 3,
|
||||
new OsStats.Cgroup.CpuStat(4, 5, 6), null, null, null));
|
||||
assertEquals(10_737_418_240L, MachineLearning.machineMemoryFromStats(stats));
|
||||
}
|
||||
|
||||
public void testMachineMemory_givenCgroupNoLimit() throws IOException {
|
||||
OsStats stats = mock(OsStats.class);
|
||||
when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L));
|
||||
when(stats.getCgroup()).thenReturn(new OsStats.Cgroup("a", 1, "b", 2, 3,
|
||||
new OsStats.Cgroup.CpuStat(4, 5, 6), "c", "18446744073709551615", "4796416"));
|
||||
assertEquals(10_737_418_240L, MachineLearning.machineMemoryFromStats(stats));
|
||||
}
|
||||
|
||||
public void testMachineMemory_givenCgroupLowLimit() throws IOException {
|
||||
OsStats stats = mock(OsStats.class);
|
||||
when(stats.getMem()).thenReturn(new OsStats.Mem(10_737_418_240L, 5_368_709_120L));
|
||||
when(stats.getCgroup()).thenReturn(new OsStats.Cgroup("a", 1, "b", 2, 3,
|
||||
new OsStats.Cgroup.CpuStat(4, 5, 6), "c", "7516192768", "4796416"));
|
||||
assertEquals(7_516_192_768L, MachineLearning.machineMemoryFromStats(stats));
|
||||
}
|
||||
|
||||
private MachineLearning createMachineLearning(Settings settings) {
|
||||
return new MachineLearning(settings, mock(Environment.class), mock(XPackLicenseState.class));
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -91,9 +93,10 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
OpenJobAction.validate("job_id", mlBuilder.build());
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNode() {
|
||||
public void testSelectLeastLoadedMlNode_byCount() {
|
||||
Map<String, String> nodeAttr = new HashMap<>();
|
||||
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
|
||||
// MachineLearning.MACHINE_MEMORY_NODE_ATTR not set, so this will fall back to allocating by count
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
nodeAttr, Collections.emptySet(), Version.CURRENT))
|
||||
|
@ -117,10 +120,49 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
cs.metaData(metaData);
|
||||
cs.routingTable(routingTable.build());
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, logger);
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, 30, logger);
|
||||
assertEquals("", result.getExplanation());
|
||||
assertEquals("_node_id3", result.getExecutorNode());
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNode_byMemory() {
|
||||
Map<String, String> nodeAttr = new HashMap<>();
|
||||
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
|
||||
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "16000000000");
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
nodeAttr, Collections.emptySet(), Version.CURRENT))
|
||||
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
|
||||
nodeAttr, Collections.emptySet(), Version.CURRENT))
|
||||
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
|
||||
nodeAttr, Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id1", "_node_id1", JobState.fromString("opened"), tasksBuilder);
|
||||
addJobTask("job_id2", "_node_id2", JobState.fromString("opened"), tasksBuilder);
|
||||
addJobTask("job_id3", "_node_id2", JobState.fromString("opened"), tasksBuilder);
|
||||
addJobTask("job_id4", "_node_id3", JobState.fromString("opened"), tasksBuilder);
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
|
||||
MetaData.Builder metaData = MetaData.builder();
|
||||
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||
addJobAndIndices(metaData, routingTable, jobId -> {
|
||||
// remember we add 100MB for the process overhead, so these model memory
|
||||
// limits correspond to estimated footprints of 102MB and 205MB
|
||||
long jobSize = (jobId.equals("job_id2") || jobId.equals("job_id3")) ? 2 : 105;
|
||||
return BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(jobSize, ByteSizeUnit.MB)).build(new Date());
|
||||
}, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5");
|
||||
cs.nodes(nodes);
|
||||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
cs.metaData(metaData);
|
||||
cs.routingTable(routingTable.build());
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id5", cs.build(), 2, 10, 30, logger);
|
||||
assertEquals("", result.getExplanation());
|
||||
assertEquals("_node_id2", result.getExecutorNode());
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNode_maxCapacity() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
int maxRunningJobsPerNode = randomIntBetween(1, 100);
|
||||
|
@ -129,13 +171,15 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
|
||||
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
String[] jobIds = new String[numNodes * maxRunningJobsPerNode];
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
String nodeId = "_node_id" + i;
|
||||
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i);
|
||||
nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT));
|
||||
for (int j = 0; j < maxRunningJobsPerNode; j++) {
|
||||
long id = j + (maxRunningJobsPerNode * i);
|
||||
addJobTask("job_id" + id, nodeId, JobState.OPENED, tasksBuilder);
|
||||
int id = j + (maxRunningJobsPerNode * i);
|
||||
jobIds[id] = "job_id" + id;
|
||||
addJobTask(jobIds[id], nodeId, JobState.OPENED, tasksBuilder);
|
||||
}
|
||||
}
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
@ -143,12 +187,12 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
|
||||
MetaData.Builder metaData = MetaData.builder();
|
||||
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
|
||||
addJobAndIndices(metaData, routingTable, jobIds);
|
||||
cs.nodes(nodes);
|
||||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
cs.metaData(metaData);
|
||||
cs.routingTable(routingTable.build());
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, logger);
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, 30, logger);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
|
||||
+ "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
|
||||
|
@ -174,7 +218,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
cs.metaData(metaData);
|
||||
cs.routingTable(routingTable.build());
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, logger);
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, 30, logger);
|
||||
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
|
||||
assertNull(result.getExecutorNode());
|
||||
}
|
||||
|
@ -209,7 +253,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
csBuilder.metaData(metaData);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, logger);
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, 30, logger);
|
||||
assertEquals("_node_id3", result.getExecutorNode());
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
|
||||
|
@ -219,7 +263,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
csBuilder = ClusterState.builder(cs);
|
||||
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
||||
cs = csBuilder.build();
|
||||
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger);
|
||||
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
|
||||
assertNull("no node selected, because OPENING state", result.getExecutorNode());
|
||||
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
||||
|
||||
|
@ -230,7 +274,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
csBuilder = ClusterState.builder(cs);
|
||||
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
||||
cs = csBuilder.build();
|
||||
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger);
|
||||
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
|
||||
assertNull("no node selected, because stale task", result.getExecutorNode());
|
||||
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
||||
|
||||
|
@ -241,7 +285,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
csBuilder = ClusterState.builder(cs);
|
||||
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
||||
cs = csBuilder.build();
|
||||
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger);
|
||||
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
|
||||
assertNull("no node selected, because null state", result.getExecutorNode());
|
||||
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
||||
}
|
||||
|
@ -276,7 +320,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
cs.metaData(metaData);
|
||||
cs.routingTable(routingTable.build());
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger);
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, 30, logger);
|
||||
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
|
||||
assertNull(result.getExecutorNode());
|
||||
}
|
||||
|
@ -303,7 +347,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
cs.metaData(metaData);
|
||||
cs.routingTable(routingTable.build());
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, logger);
|
||||
Assignment result = OpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, 30, logger);
|
||||
assertThat(result.getExplanation(), containsString("because this node does not support jobs of version [" + Version.CURRENT + "]"));
|
||||
assertNull(result.getExecutorNode());
|
||||
}
|
||||
|
@ -402,7 +446,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testMappingRequiresUpdateSomeVersionMix() throws IOException {
|
||||
Map<String, Object> versionMix = new HashMap<String, Object>();
|
||||
Map<String, Object> versionMix = new HashMap<>();
|
||||
versionMix.put("version_54", Version.V_5_4_0);
|
||||
versionMix.put("version_current", Version.CURRENT);
|
||||
versionMix.put("version_null", null);
|
||||
|
@ -425,7 +469,8 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) {
|
||||
addJobAndIndices(metaData, routingTable, jobId -> BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date()), jobIds);
|
||||
addJobAndIndices(metaData, routingTable, jobId ->
|
||||
BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()), jobIds);
|
||||
}
|
||||
|
||||
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, Function<String, Job> jobCreator,
|
||||
|
|
|
@ -102,7 +102,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
|||
jobProvider = new JobProvider(client(), builder.build());
|
||||
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
|
||||
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, new NoOpRenormalizer(),
|
||||
new JobResultsPersister(nodeSettings(), client()), new ModelSizeStats.Builder(JOB_ID).build()) {
|
||||
new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) {
|
||||
@Override
|
||||
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
|
||||
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
|
||||
|
|
|
@ -12,6 +12,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
|
@ -44,19 +46,19 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
|
||||
public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
||||
|
||||
public void testFailOverBasics() throws Exception {
|
||||
internalCluster().ensureAtLeastNumDataNodes(4);
|
||||
ensureStableCluster(4);
|
||||
|
||||
Job.Builder job = createJob("fail-over-basics-job");
|
||||
Job.Builder job = createJob("fail-over-basics-job", new ByteSizeValue(2, ByteSizeUnit.MB));
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
@ -200,7 +202,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
ensureStableCluster(3);
|
||||
|
||||
String jobId = "dedicated-ml-node-job";
|
||||
Job.Builder job = createJob(jobId);
|
||||
Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.MB));
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
@ -213,10 +215,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
PersistentTask task = tasks.getTask(MlMetadata.jobTaskId(jobId));
|
||||
|
||||
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
|
||||
Map<String, String> expectedNodeAttr = new HashMap<>();
|
||||
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
|
||||
expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
|
||||
assertEquals(expectedNodeAttr, node.getAttributes());
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"));
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(JobState.OPENED, jobTaskStatus.getState());
|
||||
|
@ -284,7 +284,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
|
||||
int numJobs = numMlNodes * 10;
|
||||
for (int i = 0; i < numJobs; i++) {
|
||||
Job.Builder job = createJob(Integer.toString(i));
|
||||
Job.Builder job = createJob(Integer.toString(i), new ByteSizeValue(2, ByteSizeUnit.MB));
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
@ -401,10 +401,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
assertNotNull(task.getExecutorNode());
|
||||
assertFalse(task.needsReassignment(clusterState.nodes()));
|
||||
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
|
||||
Map<String, String> expectedNodeAttr = new HashMap<>();
|
||||
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
|
||||
expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
|
||||
assertEquals(expectedNodeAttr, node.getAttributes());
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"));
|
||||
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
|
|
|
@ -6,10 +6,12 @@
|
|||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
|
||||
import org.junit.After;
|
||||
|
||||
|
@ -66,6 +68,13 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase {
|
|||
// This is the key assertion: if renormalization never happened then the record_score would
|
||||
// be the same as the initial_record_score on the anomaly record that happened earlier
|
||||
assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore()));
|
||||
|
||||
// Since this job ran for 50 buckets, it's a good place to assert
|
||||
// that established model memory matches model memory in the job stats
|
||||
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
|
||||
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
|
||||
Job updatedJob = getJob(job.getId()).get(0);
|
||||
assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
|
||||
}
|
||||
|
||||
private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception {
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
|
@ -20,6 +21,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
|||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -85,6 +87,13 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
waitUntilJobIsClosed(job.getId());
|
||||
|
||||
// Since this job ran for 168 buckets, it's a good place to assert
|
||||
// that established model memory matches model memory in the job stats
|
||||
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
|
||||
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
|
||||
Job updatedJob = getJob(job.getId()).get(0);
|
||||
assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
|
||||
}
|
||||
|
||||
public void testRealtime() throws Exception {
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
|
||||
public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
||||
|
||||
|
@ -42,7 +41,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
initClusterAndJob(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenNoStatsLongHistory() throws Exception {
|
||||
|
@ -53,7 +52,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
createBuckets(jobId, 25);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenNoStatsShortHistory() throws Exception {
|
||||
|
@ -64,7 +63,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
createBuckets(jobId, 5);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenHistoryTooShort() throws Exception {
|
||||
|
@ -74,10 +73,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
createBuckets(jobId, 19);
|
||||
createModelSizeStats(jobId, 1, 19000L);
|
||||
createModelSizeStats(jobId, 10, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 19, latestModelSizeStats), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenHistoryJustEnoughLowVariation() throws Exception {
|
||||
|
@ -87,10 +87,25 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
createBuckets(jobId, 20);
|
||||
createModelSizeStats(jobId, 1, 19000L);
|
||||
createModelSizeStats(jobId, 10, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(20000L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenHistoryJustEnoughAndUninitialized() throws Exception {
|
||||
String jobId = "just-enough-low-cv-established-mem-job";
|
||||
|
||||
initClusterAndJob(jobId);
|
||||
|
||||
createBuckets(jobId, 20);
|
||||
createModelSizeStats(jobId, 1, 0L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 0L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenHistoryJustEnoughHighVariation() throws Exception {
|
||||
|
@ -100,10 +115,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
createBuckets(jobId, 20);
|
||||
createModelSizeStats(jobId, 1, 1000L);
|
||||
createModelSizeStats(jobId, 10, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenLongEstablished() throws Exception {
|
||||
|
@ -113,10 +129,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
createBuckets(jobId, 25);
|
||||
createModelSizeStats(jobId, 1, 10000L);
|
||||
createModelSizeStats(jobId, 2, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 2, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenOneRecentChange() throws Exception {
|
||||
|
@ -126,10 +143,24 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
createBuckets(jobId, 25);
|
||||
createModelSizeStats(jobId, 1, 10000L);
|
||||
createModelSizeStats(jobId, 10, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenOneRecentChangeOnlyAndUninitialized() throws Exception {
|
||||
String jobId = "one-recent-change-established-mem-job";
|
||||
|
||||
initClusterAndJob(jobId);
|
||||
|
||||
createBuckets(jobId, 25);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 0L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenOneRecentChangeOnly() throws Exception {
|
||||
|
@ -138,10 +169,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
initClusterAndJob(jobId);
|
||||
|
||||
createBuckets(jobId, 25);
|
||||
createModelSizeStats(jobId, 10, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenHistoricHighVariationRecentLowVariation() throws Exception {
|
||||
|
@ -155,10 +187,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
createModelSizeStats(jobId, 10, 6000L);
|
||||
createModelSizeStats(jobId, 19, 9000L);
|
||||
createModelSizeStats(jobId, 30, 19000L);
|
||||
createModelSizeStats(jobId, 35, 20000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 35, 20000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 40, latestModelSizeStats), equalTo(20000L));
|
||||
}
|
||||
|
||||
public void testEstablishedMem_givenHistoricLowVariationRecentHighVariation() throws Exception {
|
||||
|
@ -172,10 +205,11 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
createModelSizeStats(jobId, 25, 21000L);
|
||||
createModelSizeStats(jobId, 27, 39000L);
|
||||
createModelSizeStats(jobId, 30, 67000L);
|
||||
createModelSizeStats(jobId, 35, 95000L);
|
||||
ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 35, 95000L);
|
||||
jobResultsPersister.commitResultWrites(jobId);
|
||||
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
|
||||
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
|
||||
assertThat(queryEstablishedMemoryUsage(jobId, 40, latestModelSizeStats), equalTo(0L));
|
||||
}
|
||||
|
||||
private void initClusterAndJob(String jobId) {
|
||||
|
@ -197,21 +231,28 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
builder.executeRequest();
|
||||
}
|
||||
|
||||
private void createModelSizeStats(String jobId, int bucketNum, long modelBytes) {
|
||||
ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId);
|
||||
modelSizeStats.setTimestamp(new Date(bucketSpan * bucketNum));
|
||||
modelSizeStats.setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000)));
|
||||
modelSizeStats.setModelBytes(modelBytes);
|
||||
jobResultsPersister.persistModelSizeStats(modelSizeStats.build());
|
||||
private ModelSizeStats createModelSizeStats(String jobId, int bucketNum, long modelBytes) {
|
||||
ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(jobId)
|
||||
.setTimestamp(new Date(bucketSpan * bucketNum))
|
||||
.setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000)))
|
||||
.setModelBytes(modelBytes).build();
|
||||
jobResultsPersister.persistModelSizeStats(modelSizeStats);
|
||||
return modelSizeStats;
|
||||
}
|
||||
|
||||
private Long queryEstablishedMemoryUsage(String jobId) throws Exception {
|
||||
return queryEstablishedMemoryUsage(jobId, null, null);
|
||||
}
|
||||
|
||||
private Long queryEstablishedMemoryUsage(String jobId, Integer bucketNum, ModelSizeStats latestModelSizeStats)
|
||||
throws Exception {
|
||||
AtomicReference<Long> establishedModelMemoryUsage = new AtomicReference<>();
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
jobProvider.getEstablishedMemoryUsage(jobId, memUse -> {
|
||||
Date latestBucketTimestamp = (bucketNum != null) ? new Date(bucketSpan * bucketNum) : null;
|
||||
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, latestModelSizeStats, memUse -> {
|
||||
establishedModelMemoryUsage.set(memUse);
|
||||
latch.countDown();
|
||||
}, e -> {
|
||||
|
|
|
@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.integration;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
@ -54,7 +56,7 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase {
|
|||
internalCluster().ensureAtLeastNumDataNodes(5);
|
||||
ensureStableCluster(5);
|
||||
|
||||
Job.Builder job = createJob("relocation-job");
|
||||
Job.Builder job = createJob("relocation-job", new ByteSizeValue(2, ByteSizeUnit.MB));
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
|
|
@ -7,12 +7,14 @@ package org.elasticsearch.xpack.ml.integration;
|
|||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction;
|
||||
import org.elasticsearch.xpack.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -97,6 +99,13 @@ public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase {
|
|||
GetOverallBucketsAction.INSTANCE, filteredOverallBucketsRequest).actionGet();
|
||||
assertThat(filteredOverallBucketsResponse.getOverallBuckets().count(), equalTo(2L));
|
||||
}
|
||||
|
||||
// Since this job ran for 3000 buckets, it's a good place to assert
|
||||
// that established model memory matches model memory in the job stats
|
||||
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
|
||||
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
|
||||
Job updatedJob = getJob(job.getId()).get(0);
|
||||
assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
|
||||
}
|
||||
|
||||
private static Map<String, Object> createRecord(long timestamp) {
|
||||
|
|
|
@ -11,11 +11,13 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
|||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -105,12 +107,21 @@ public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
|
|||
assertThat(getBuckets(splitJob.getId()).size(), equalTo(oneGoBuckets.size()));
|
||||
assertThat(getRecords(oneGoJob.getId()).isEmpty(), is(true));
|
||||
assertThat(getRecords(splitJob.getId()).isEmpty(), is(true));
|
||||
|
||||
// Since these jobs ran for 72 buckets, it's a good place to assert
|
||||
// that established model memory matches model memory in the job stats
|
||||
for (Job.Builder job : Arrays.asList(oneGoJob, splitJob)) {
|
||||
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
|
||||
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
|
||||
Job updatedJob = getJob(job.getId()).get(0);
|
||||
assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
|
||||
}
|
||||
}
|
||||
|
||||
private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception {
|
||||
Detector.Builder detector = new Detector.Builder("mean", "value");
|
||||
detector.setByFieldName("by_field");
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
|
||||
analysisConfig.setBucketSpan(bucketSpan);
|
||||
Job.Builder job = new Job.Builder(jobId);
|
||||
job.setAnalysisConfig(analysisConfig);
|
||||
|
|
|
@ -9,7 +9,11 @@ import org.elasticsearch.ElasticsearchStatusException;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.action.CloseJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
|
@ -28,7 +32,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
startMlCluster(1, 1);
|
||||
|
||||
// create and open first job, which succeeds:
|
||||
Job.Builder job = createJob("close-failed-job-1");
|
||||
Job.Builder job = createJob("close-failed-job-1", new ByteSizeValue(2, ByteSizeUnit.MB));
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
@ -40,7 +44,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
});
|
||||
|
||||
// create and try to open second job, which fails:
|
||||
job = createJob("close-failed-job-2");
|
||||
job = createJob("close-failed-job-2", new ByteSizeValue(2, ByteSizeUnit.MB));
|
||||
putJobRequest = new PutJobAction.Request(job);
|
||||
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
@ -60,18 +64,23 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
}
|
||||
|
||||
public void testSingleNode() throws Exception {
|
||||
verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 32));
|
||||
verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100));
|
||||
}
|
||||
|
||||
public void testMultipleNodes() throws Exception {
|
||||
verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 32));
|
||||
verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 100));
|
||||
}
|
||||
|
||||
private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode) throws Exception {
|
||||
startMlCluster(numNodes, maxNumberOfJobsPerNode);
|
||||
long maxMlMemoryPerNode = calculateMaxMlMemory();
|
||||
ByteSizeValue jobModelMemoryLimit = new ByteSizeValue(2, ByteSizeUnit.MB);
|
||||
long memoryFootprintPerJob = jobModelMemoryLimit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes();
|
||||
long maxJobsPerNodeDueToMemoryLimit = maxMlMemoryPerNode / memoryFootprintPerJob;
|
||||
int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode;
|
||||
boolean expectMemoryLimitBeforeCountLimit = maxJobsPerNodeDueToMemoryLimit < maxNumberOfJobsPerNode;
|
||||
for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) {
|
||||
Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i));
|
||||
Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i), jobModelMemoryLimit);
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
@ -86,9 +95,19 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
});
|
||||
logger.info("Opened {}th job", i);
|
||||
} catch (ElasticsearchStatusException e) {
|
||||
assertTrue(e.getMessage(), e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation"));
|
||||
assertTrue(e.getMessage(), e.getMessage().endsWith("because this node is full. Number of opened jobs [" + maxNumberOfJobsPerNode +
|
||||
"], xpack.ml.max_open_jobs [" + maxNumberOfJobsPerNode + "]]"));
|
||||
assertTrue(e.getMessage(),
|
||||
e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation"));
|
||||
if (expectMemoryLimitBeforeCountLimit) {
|
||||
int expectedJobsAlreadyOpenOnNode = (i - 1) / numNodes;
|
||||
assertTrue(e.getMessage(),
|
||||
e.getMessage().endsWith("because this node has insufficient available memory. Available memory for ML [" +
|
||||
maxMlMemoryPerNode + "], memory required by existing jobs [" +
|
||||
(expectedJobsAlreadyOpenOnNode * memoryFootprintPerJob) +
|
||||
"], estimated memory required for this job [" + memoryFootprintPerJob + "]]"));
|
||||
} else {
|
||||
assertTrue(e.getMessage(), e.getMessage().endsWith("because this node is full. Number of opened jobs [" +
|
||||
maxNumberOfJobsPerNode + "], xpack.ml.max_open_jobs [" + maxNumberOfJobsPerNode + "]]"));
|
||||
}
|
||||
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
|
||||
|
||||
// close the first job and check if the latest job gets opened:
|
||||
|
@ -122,4 +141,9 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
ensureStableCluster(numNodes);
|
||||
}
|
||||
|
||||
private long calculateMaxMlMemory() {
|
||||
Settings settings = internalCluster().getInstance(Settings.class);
|
||||
return Long.parseLong(internalCluster().getInstance(TransportService.class).getLocalNode().getAttributes()
|
||||
.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR)) * MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings) / 100;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -431,9 +431,10 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
public void testBuilder_buildWithCreateTime() {
|
||||
Job.Builder builder = buildJobBuilder("foo");
|
||||
Date now = new Date();
|
||||
Job job = builder.build(now);
|
||||
Job job = builder.setEstablishedModelMemory(randomNonNegativeLong()).build(now);
|
||||
assertEquals(now, job.getCreateTime());
|
||||
assertEquals(Version.CURRENT, job.getJobVersion());
|
||||
assertNull(job.getEstablishedModelMemory());
|
||||
}
|
||||
|
||||
public void testJobWithoutVersion() throws IOException {
|
||||
|
@ -516,6 +517,39 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
assertThat(e.getMessage(), containsString("Invalid group id '$$$'"));
|
||||
}
|
||||
|
||||
public void testEstimateMemoryFootprint_GivenEstablished() {
|
||||
Job.Builder builder = buildJobBuilder("established");
|
||||
long establishedModelMemory = randomIntBetween(10_000, 2_000_000_000);
|
||||
builder.setEstablishedModelMemory(establishedModelMemory);
|
||||
if (randomBoolean()) {
|
||||
builder.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), null));
|
||||
}
|
||||
assertEquals(establishedModelMemory + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
|
||||
}
|
||||
|
||||
public void testEstimateMemoryFootprint_GivenLimitAndNotEstablished() {
|
||||
Job.Builder builder = buildJobBuilder("limit");
|
||||
if (rarely()) {
|
||||
// An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory()
|
||||
// will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0.
|
||||
builder.setEstablishedModelMemory(0L);
|
||||
}
|
||||
ByteSizeValue limit = new ByteSizeValue(randomIntBetween(100, 10000), ByteSizeUnit.MB);
|
||||
builder.setAnalysisLimits(new AnalysisLimits(limit.getMb(), null));
|
||||
assertEquals(limit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
|
||||
}
|
||||
|
||||
public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() {
|
||||
Job.Builder builder = buildJobBuilder("nolimit");
|
||||
if (rarely()) {
|
||||
// An "established" model memory of 0 means "not established". Generally this won't be set, so getEstablishedModelMemory()
|
||||
// will return null, but if it returns 0 we shouldn't estimate the job's memory requirement to be 0.
|
||||
builder.setEstablishedModelMemory(0L);
|
||||
}
|
||||
assertEquals(ByteSizeUnit.MB.toBytes(JobUpdate.UNDEFINED_MODEL_MEMORY_LIMIT_DEFAULT) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(),
|
||||
builder.build().estimateMemoryFootprint());
|
||||
}
|
||||
|
||||
public static Job.Builder buildJobBuilder(String id, Date date) {
|
||||
Job.Builder builder = new Job.Builder(id);
|
||||
builder.setCreateTime(date);
|
||||
|
@ -566,6 +600,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
if (randomBoolean()) {
|
||||
builder.setLastDataTime(new Date(randomNonNegativeLong()));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setEstablishedModelMemory(randomNonNegativeLong());
|
||||
}
|
||||
builder.setAnalysisConfig(AnalysisConfigTests.createRandomized());
|
||||
builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized());
|
||||
|
||||
|
|
|
@ -85,6 +85,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
if (randomBoolean()) {
|
||||
update.setModelSnapshotId(randomAlphaOfLength(10));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
update.setEstablishedModelMemory(randomNonNegativeLong());
|
||||
}
|
||||
|
||||
return update.build();
|
||||
}
|
||||
|
|
|
@ -7,10 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
|
@ -33,11 +35,13 @@ import java.util.Date;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isNull;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
|
@ -55,6 +59,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
private Client client;
|
||||
private Renormalizer renormalizer;
|
||||
private JobResultsPersister persister;
|
||||
private JobProvider jobProvider;
|
||||
private FlushListener flushListener;
|
||||
private AutoDetectResultProcessor processorUnderTest;
|
||||
|
||||
|
@ -63,12 +68,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
client = mock(Client.class);
|
||||
renormalizer = mock(Renormalizer.class);
|
||||
persister = mock(JobResultsPersister.class);
|
||||
jobProvider = mock(JobProvider.class);
|
||||
flushListener = mock(FlushListener.class);
|
||||
processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister,
|
||||
new ModelSizeStats.Builder(JOB_ID).build(), flushListener);
|
||||
processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, jobProvider,
|
||||
new ModelSizeStats.Builder(JOB_ID).build(), false, flushListener);
|
||||
}
|
||||
|
||||
public void testProcess() throws TimeoutException {
|
||||
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
|
||||
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
|
||||
AutodetectResult autodetectResult = mock(AutodetectResult.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Iterator<AutodetectResult> iterator = mock(Iterator.class);
|
||||
|
@ -259,6 +267,36 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
|
||||
verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
|
||||
verifyNoMoreInteractions(persister);
|
||||
// No interactions with the jobProvider confirms that the established memory calculation did not run
|
||||
verifyNoMoreInteractions(jobProvider);
|
||||
assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
|
||||
}
|
||||
|
||||
public void testProcessResult_modelSizeStatsAfterManyBuckets() {
|
||||
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
|
||||
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
|
||||
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
|
||||
|
||||
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
|
||||
context.deleteInterimRequired = false;
|
||||
for (int i = 0; i < JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
Bucket bucket = mock(Bucket.class);
|
||||
when(result.getBucket()).thenReturn(bucket);
|
||||
processorUnderTest.processResult(context, result);
|
||||
}
|
||||
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
|
||||
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
|
||||
processorUnderTest.processResult(context, result);
|
||||
|
||||
verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
|
||||
verify(persister, times(1)).commitResultWrites(JOB_ID);
|
||||
verifyNoMoreInteractions(persister);
|
||||
verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), isNull(Date.class), eq(modelSizeStats),
|
||||
any(Consumer.class), any(Consumer.class));
|
||||
verifyNoMoreInteractions(jobProvider);
|
||||
assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
|
||||
}
|
||||
|
||||
|
@ -273,12 +311,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
when(result.getModelSnapshot()).thenReturn(modelSnapshot);
|
||||
processorUnderTest.processResult(context, result);
|
||||
|
||||
verify(persister, times(1)).persistModelSnapshot(modelSnapshot);
|
||||
verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
UpdateJobAction.Request expectedJobUpdateRequest = new UpdateJobAction.Request(JOB_ID,
|
||||
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
|
||||
|
||||
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
|
||||
verify(persister).commitResultWrites(JOB_ID);
|
||||
verifyNoMoreInteractions(persister);
|
||||
}
|
||||
|
||||
|
@ -301,6 +338,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testAwaitCompletion() throws TimeoutException {
|
||||
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
|
||||
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
|
||||
AutodetectResult autodetectResult = mock(AutodetectResult.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Iterator<AutodetectResult> iterator = mock(Iterator.class);
|
||||
|
@ -316,6 +355,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testPersisterThrowingDoesntBlockProcessing() {
|
||||
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
|
||||
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
|
||||
AutodetectResult autodetectResult = mock(AutodetectResult.class);
|
||||
ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
|
||||
when(autodetectResult.getModelSnapshot()).thenReturn(modelSnapshot);
|
||||
|
@ -329,10 +370,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
when(process.isProcessAliveAfterWaiting()).thenReturn(true);
|
||||
when(process.readAutodetectResults()).thenReturn(iterator);
|
||||
|
||||
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any());
|
||||
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any());
|
||||
|
||||
processorUnderTest.process(process);
|
||||
verify(persister, times(2)).persistModelSnapshot(any());
|
||||
verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
|
||||
}
|
||||
|
||||
public void testParsingErrorSetsFailed() {
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
|
@ -36,6 +37,7 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisLimits;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
|
@ -118,6 +120,10 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
protected Job.Builder createJob(String id) {
|
||||
return createJob(id, null);
|
||||
}
|
||||
|
||||
protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) {
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
|
||||
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
|
||||
|
@ -127,13 +133,19 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
|
||||
Job.Builder builder = new Job.Builder();
|
||||
builder.setId(id);
|
||||
|
||||
if (modelMemoryLimit != null) {
|
||||
builder.setAnalysisLimits(new AnalysisLimits(modelMemoryLimit.getMb(), null));
|
||||
}
|
||||
builder.setAnalysisConfig(analysisConfig);
|
||||
builder.setDataDescription(dataDescription);
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static Job.Builder createFareQuoteJob(String id) {
|
||||
return createFareQuoteJob(id, null);
|
||||
}
|
||||
|
||||
public static Job.Builder createFareQuoteJob(String id, ByteSizeValue modelMemoryLimit) {
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
|
||||
dataDescription.setTimeFormat(DataDescription.EPOCH);
|
||||
|
@ -146,6 +158,9 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
|
||||
Job.Builder builder = new Job.Builder();
|
||||
builder.setId(id);
|
||||
if (modelMemoryLimit != null) {
|
||||
builder.setAnalysisLimits(new AnalysisLimits(modelMemoryLimit.getMb(), null));
|
||||
}
|
||||
builder.setAnalysisConfig(analysisConfig);
|
||||
builder.setDataDescription(dataDescription);
|
||||
return builder;
|
||||
|
|
|
@ -75,7 +75,8 @@ public class InternalClientIntegTests extends ESSingleNodeTestCase {
|
|||
String scrollId = randomAlphaOfLength(5);
|
||||
SearchHit[] hits = new SearchHit[] {new SearchHit(1)};
|
||||
InternalSearchResponse internalResponse = new InternalSearchResponse(new SearchHits(hits, 1, 1), null, null, null, false, false, 1);
|
||||
SearchResponse response = new SearchResponse(internalResponse, scrollId, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY);
|
||||
SearchResponse response = new SearchResponse(internalResponse, scrollId, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
|
||||
Answer<?> returnResponse = invocation -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -140,7 +140,7 @@ public class WatcherServiceTests extends ESTestCase {
|
|||
// empty scroll response, no further scrolling needed
|
||||
SearchResponseSections scrollSearchSections = new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 1);
|
||||
SearchResponse scrollSearchResponse = new SearchResponse(scrollSearchSections, "scrollId", 1, 1, 0, 10,
|
||||
ShardSearchFailure.EMPTY_ARRAY);
|
||||
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
|
||||
|
||||
// one search response containing active and inactive watches
|
||||
int count = randomIntBetween(2, 200);
|
||||
|
@ -166,7 +166,8 @@ public class WatcherServiceTests extends ESTestCase {
|
|||
}
|
||||
SearchHits searchHits = new SearchHits(hits, count, 1.0f);
|
||||
SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1);
|
||||
SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY);
|
||||
SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
|
||||
// we do need to to use this kind of mocking because of the internal client, which calls doExecute at the end on the supplied
|
||||
// client instance
|
||||
|
|
|
@ -84,7 +84,8 @@ public class CompareConditionSearchTests extends AbstractWatcherIntegrationTestC
|
|||
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
|
||||
new SearchHits(new SearchHit[]{hit}, 1L, 1f), null, null, null, false, false, 1);
|
||||
SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
|
||||
WatchExecutionContext ctx = mockExecutionContext("_watch_name", new Payload.XContent(response));
|
||||
assertThat(condition.execute(ctx).met(), is(true));
|
||||
|
|
|
@ -94,7 +94,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
|
||||
public void testExecute() throws Exception {
|
||||
ScriptCondition condition = new ScriptCondition(mockScript("ctx.payload.hits.total > 1"), scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
|
||||
assertFalse(condition.execute(ctx).met());
|
||||
}
|
||||
|
@ -102,7 +103,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
public void testExecuteMergedParams() throws Exception {
|
||||
Script script = new Script(ScriptType.INLINE, "mockscript", "ctx.payload.hits.total > threshold", singletonMap("threshold", 1));
|
||||
ScriptCondition executable = new ScriptCondition(script, scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
|
||||
assertFalse(executable.execute(ctx).met());
|
||||
}
|
||||
|
@ -115,7 +117,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
parser.nextToken();
|
||||
ScriptCondition executable = ScriptCondition.parse(scriptService, "_watch", parser);
|
||||
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
|
||||
|
||||
assertFalse(executable.execute(ctx).met());
|
||||
|
@ -179,7 +182,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
public void testScriptConditionThrowException() throws Exception {
|
||||
ScriptCondition condition = new ScriptCondition(
|
||||
mockScript("null.foo"), scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
|
||||
ScriptException exception = expectThrows(ScriptException.class, () -> condition.execute(ctx));
|
||||
assertThat(exception.getMessage(), containsString("Error evaluating null.foo"));
|
||||
|
@ -187,7 +191,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
|
||||
public void testScriptConditionReturnObjectThrowsException() throws Exception {
|
||||
ScriptCondition condition = new ScriptCondition(mockScript("return new Object()"), scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
|
||||
Exception exception = expectThrows(IllegalStateException.class, () -> condition.execute(ctx));
|
||||
assertThat(exception.getMessage(),
|
||||
|
@ -197,7 +202,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
public void testScriptConditionAccessCtx() throws Exception {
|
||||
ScriptCondition condition = new ScriptCondition(mockScript("ctx.trigger.scheduled_time.getMillis() < new Date().time"),
|
||||
scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, new ShardSearchFailure[0]);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 0, 500L, ShardSearchFailure.EMPTY_ARRAY,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new DateTime(DateTimeZone.UTC), new Payload.XContent(response));
|
||||
Thread.sleep(10);
|
||||
assertThat(condition.execute(ctx).met(), is(true));
|
||||
|
|
|
@ -199,8 +199,8 @@ public class TriggeredWatchStoreTests extends ESTestCase {
|
|||
hit.sourceRef(source);
|
||||
hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f);
|
||||
SearchResponse searchResponse2 = new SearchResponse(
|
||||
new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 0, 1, null);
|
||||
SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null);
|
||||
new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 0, 1, null, null);
|
||||
SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null, null);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[0];
|
||||
|
|
|
@ -81,7 +81,7 @@ public class SearchInputTests extends ESTestCase {
|
|||
ArgumentCaptor<SearchRequest> requestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), "", 1, 1, 0, 1234,
|
||||
ShardSearchFailure.EMPTY_ARRAY);
|
||||
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
|
||||
searchFuture.onResponse(searchResponse);
|
||||
when(client.search(requestCaptor.capture())).thenReturn(searchFuture);
|
||||
|
||||
|
@ -104,7 +104,7 @@ public class SearchInputTests extends ESTestCase {
|
|||
ArgumentCaptor<SearchRequest> requestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), "", 1, 1, 0, 1234,
|
||||
ShardSearchFailure.EMPTY_ARRAY);
|
||||
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
|
||||
searchFuture.onResponse(searchResponse);
|
||||
when(client.search(requestCaptor.capture())).thenReturn(searchFuture);
|
||||
|
||||
|
@ -146,7 +146,7 @@ public class SearchInputTests extends ESTestCase {
|
|||
ArgumentCaptor<SearchRequest> requestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), "", 1, 1, 0, 1234,
|
||||
ShardSearchFailure.EMPTY_ARRAY);
|
||||
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
|
||||
searchFuture.onResponse(searchResponse);
|
||||
when(client.search(requestCaptor.capture())).thenReturn(searchFuture);
|
||||
|
||||
|
|
Loading…
Reference in New Issue