diff --git a/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java b/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java index 1945def2437..510af33f3c7 100644 --- a/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java +++ b/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.license; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -230,16 +231,20 @@ public class XPackInfoResponse extends ActionResponse { @Nullable private final String description; private final boolean available; private final boolean enabled; + @Nullable private final Map nativeCodeInfo; public FeatureSet(StreamInput in) throws IOException { - this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean()); + this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean(), + in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED) ? in.readMap() : null); } - public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled) { + public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled, + @Nullable Map nativeCodeInfo) { this.name = name; this.description = description; this.available = available; this.enabled = enabled; + this.nativeCodeInfo = nativeCodeInfo; } public String name() { @@ -259,6 +264,11 @@ public class XPackInfoResponse extends ActionResponse { return enabled; } + @Nullable + public Map nativeCodeInfo() { + return nativeCodeInfo; + } + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (description != null) { @@ -266,6 +276,9 @@ public class XPackInfoResponse extends ActionResponse { } builder.field("available", available); builder.field("enabled", enabled); + if (nativeCodeInfo != null) { + builder.field("native_code_info", nativeCodeInfo); + } return builder.endObject(); } @@ -274,6 +287,9 @@ public class XPackInfoResponse extends ActionResponse { out.writeOptionalString(description); out.writeBoolean(available); out.writeBoolean(enabled); + if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { + out.writeMap(nativeCodeInfo); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java index 6995bf66d61..221f34fd44f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Map; public interface XPackFeatureSet { @@ -23,6 +24,8 @@ public interface XPackFeatureSet { boolean enabled(); + Map nativeCodeInfo(); + Usage usage(); abstract class Usage implements ToXContentObject, NamedWriteable { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java b/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java index fe764a74b94..0cbb050b646 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java @@ -59,7 +59,8 @@ public class TransportXPackInfoAction extends HandledTransportAction featureSets = this.featureSets.stream().map(fs -> - new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled())) + new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled(), + request.isVerbose() ? fs.nativeCodeInfo() : null)) .collect(Collectors.toSet()); featureSetsInfo = new XPackInfoResponse.FeatureSetsInfo(featureSets); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java index 072440730ad..f4e74f179ab 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.graph; import java.io.IOException; +import java.util.Map; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -47,6 +48,11 @@ public class GraphFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 0fc8b60a5bf..779f429d9b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Module; @@ -24,7 +23,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.ActionPlugin; @@ -81,6 +79,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.NativeController; +import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.job.process.ProcessCtrl; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -121,6 +120,7 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentActionCoordinator; import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; @@ -128,7 +128,6 @@ import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction; import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; @@ -156,6 +155,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled"; public static final Setting ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR, XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope); + public static final Setting CONCURRENT_JOB_ALLOCATIONS = + Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); private final Settings settings; private final Environment env; @@ -178,6 +179,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { return Collections.unmodifiableList( Arrays.asList(USE_NATIVE_PROCESS_OPTION, ALLOCATION_ENABLED, + CONCURRENT_JOB_ALLOCATIONS, ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING, ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, @@ -256,12 +258,17 @@ public class MachineLearning extends Plugin implements ActionPlugin { NormalizerProcessFactory normalizerProcessFactory; if (USE_NATIVE_PROCESS_OPTION.get(settings)) { try { - NativeController nativeController = new NativeController(env, new NamedPipeHelper()); - nativeController.tailLogsInThread(); + NativeController nativeController = NativeControllerHolder.getNativeController(settings); + if (nativeController == null) { + // This will only only happen when path.home is not set, which is disallowed in production + throw new ElasticsearchException("Failed to create native process controller for Machine Learning"); + } autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController, client); normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController); } catch (IOException e) { - throw new ElasticsearchException("Failed to create native process factories", e); + // This also should not happen in production, as the MachineLearningFeatureSet should have + // hit the same error first and brought down the node with a friendlier error message + throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e); } } else { autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, ignoreDowntime, executorService) -> diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 131825d81c6..4711dd6ed33 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -5,26 +5,50 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.XPackFeatureSet; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.job.process.NativeController; +import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeoutException; public class MachineLearningFeatureSet implements XPackFeatureSet { private final boolean enabled; private final XPackLicenseState licenseState; + private final Map nativeCodeInfo; @Inject public MachineLearningFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) { this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings); this.licenseState = licenseState; + Map nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO; + // Don't try to get the native code version in the transport client - the controller process won't be running + if (XPackPlugin.transportClientMode(settings) == false) { + try { + NativeController nativeController = NativeControllerHolder.getNativeController(settings); + if (nativeController != null) { + nativeCodeInfo = nativeController.getNativeCodeInfo(); + } + } catch (IOException | TimeoutException e) { + Loggers.getLogger(MachineLearningFeatureSet.class).error("Cannot get native code info for Machine Learning", e); + if (enabled) { + throw new ElasticsearchException("Cannot communicate with Machine Learning native code " + + "- please check that you are running on a supported platform"); + } + } + } + this.nativeCodeInfo = nativeCodeInfo; } @Override @@ -47,6 +71,11 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return nativeCodeInfo; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index c36dc601a1b..a254f3a177f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -46,7 +47,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; +import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; @@ -225,22 +226,22 @@ public class CloseJobAction extends Action { + private final JobStateObserver observer; private final ClusterService clusterService; private final TransportListTasksAction listTasksAction; private final TransportCancelTasksAction cancelTasksAction; - private final PersistentTaskClusterService persistentTaskClusterService; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, TransportListTasksAction listTasksAction, - TransportCancelTasksAction cancelTasksAction, PersistentTaskClusterService persistentTaskClusterService) { + TransportCancelTasksAction cancelTasksAction) { super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); + this.observer = new JobStateObserver(threadPool, clusterService); this.clusterService = clusterService; this.listTasksAction = listTasksAction; this.cancelTasksAction = cancelTasksAction; - this.persistentTaskClusterService = persistentTaskClusterService; } @Override @@ -255,7 +256,6 @@ public class CloseJobAction extends Action listener) throws Exception { - PersistentTaskInProgress task = validateAndFindTask(request.getJobId(), state); clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -269,28 +269,39 @@ public class CloseJobAction extends Action { - String expectedDescription = "job-" + request.getJobId(); - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedDescription.equals(taskInfo.getDescription())) { - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setTaskId(taskInfo.getTaskId()); - cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { - persistentTaskClusterService.completeOrRestartPersistentTask(task.getId(), null, - ActionListener.wrap( - empty -> listener.onResponse(new CloseJobAction.Response(true)), - listener::onFailure - ) - ); - }, listener::onFailure)); - return; - } + threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); } - listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); - }, listener::onFailure)); + + @Override + protected void doRun() throws Exception { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); + listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> { + String expectedDescription = "job-" + request.getJobId(); + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (expectedDescription.equals(taskInfo.getDescription())) { + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(taskInfo.getTaskId()); + cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { + observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> { + if (e == null) { + listener.onResponse(new CloseJobAction.Response(true)); + } else { + listener.onFailure(e); + } + }); + }, listener::onFailure)); + return; + } + } + listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); + }, listener::onFailure)); + } + }); } }); } @@ -333,11 +344,9 @@ public class CloseJobAction extends Action task = validateAndFindTask(jobId, currentState); PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); Map> updatedTasks = new HashMap<>(currentTasks.taskMap()); - for (PersistentTaskInProgress taskInProgress : currentTasks.tasks()) { - if (taskInProgress.getId() == task.getId()) { - updatedTasks.put(taskInProgress.getId(), new PersistentTaskInProgress<>(taskInProgress, JobState.CLOSING)); - } - } + PersistentTaskInProgress taskToUpdate = currentTasks.getTask(task.getId()); + taskToUpdate = new PersistentTaskInProgress<>(taskToUpdate, JobState.CLOSING); + updatedTasks.put(taskToUpdate.getId(), taskToUpdate); PersistentTasksInProgress newTasks = new PersistentTasksInProgress(currentTasks.getCurrentId(), updatedTasks); MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index 5b0a645b569..499e569e6db 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -28,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; @@ -94,10 +92,6 @@ public class FlushJobAction extends Action listener) { - jobManager.getJobOrThrowIfUnknown(request.getJobId()); - InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); if (request.getAdvanceTime() != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index ccaefec33e7..81e0914772f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -56,6 +56,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import org.elasticsearch.xpack.persistent.TransportPersistentAction; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -247,6 +249,8 @@ public class OpenJobAction extends Action reasons = new LinkedList<>(); + DiscoveryNode minLoadedNode = null; PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); if ("true".equals(allocationEnabled) == false) { - logger.debug("Not opening job [{}] on node [{}], because this node isn't a ml node.", jobId, node); + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node."; + logger.debug(reason); + reasons.add(reason); continue; } - long numberOfOpenedJobs; + long numberOfAssignedJobs; + int numberOfAllocatingJobs; if (persistentTasksInProgress != null) { - numberOfOpenedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME); + numberOfAssignedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME); + numberOfAllocatingJobs = persistentTasksInProgress.findTasks(OpenJobAction.NAME, task -> { + if (node.getId().equals(task.getExecutorNode()) == false) { + return false; + } + JobState jobTaskState = (JobState) task.getStatus(); + return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING + jobTaskState == JobState.OPENING || // executor node is busy starting the cpp process + task.isCurrentStatus() == false; // previous executor node failed and + // current executor node didn't have the chance to set job status to OPENING + }).size(); } else { - numberOfOpenedJobs = 0; + numberOfAssignedJobs = 0; + numberOfAllocatingJobs = 0; } - long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey())); - long available = maxNumberOfOpenJobs - numberOfOpenedJobs; - if (available == 0) { - logger.debug("Not opening job [{}] on node [{}], because this node is full. Number of opened jobs [{}], {} [{}]", - jobId, node, numberOfOpenedJobs, MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs); + if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) { + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs + + "] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state"; + logger.debug(reason); + reasons.add(reason); continue; } + + long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey())); + long available = maxNumberOfOpenJobs - numberOfAssignedJobs; + if (available == 0) { + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " + + "Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_RUNNING_JOBS_PER_NODE.getKey() + + " [" + maxNumberOfOpenJobs + "]"; + logger.debug(reason); + reasons.add(reason); + continue; + } + if (maxAvailable < available) { maxAvailable = available; - leastLoadedNode = node; + minLoadedNode = node; } } - return leastLoadedNode; + if (minLoadedNode != null) { + logger.info("selected node [{}] for job [{}]", minLoadedNode, jobId); + } else { + logger.info("no node selected for job [{}], reasons [{}]", jobId, String.join(",\n", reasons)); + } + return minLoadedNode; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index 4be40231cdd..b67c0a5d126 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -27,7 +26,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -223,11 +221,10 @@ public class PostDataAction extends Action listener) { + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription())); try { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 133f1a0a328..25fa5575aca 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -346,10 +346,6 @@ public class StartDatafeedAction throw ExceptionsHelper.missingJobException(datafeed.getJobId()); } DatafeedJobValidator.validate(datafeed, job); - if (tasks == null) { - return; - } - JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); if (jobState != JobState.OPENED) { throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]", diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index a411bc41509..bea8c998635 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -7,16 +7,14 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; @@ -25,18 +23,18 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.io.IOException; import java.util.List; -import java.util.function.Function; import java.util.function.Supplier; /** @@ -47,55 +45,49 @@ import java.util.function.Supplier; public abstract class TransportJobTaskAction, Response extends BaseTasksResponse & Writeable> extends TransportTasksAction { - protected final JobManager jobManager; protected final AutodetectProcessManager processManager; - private final Function jobIdFromRequest; - private final TransportListTasksAction listTasksAction; TransportJobTaskAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier requestSupplier, - Supplier responseSupplier, String nodeExecutor, JobManager jobManager, - AutodetectProcessManager processManager, Function jobIdFromRequest, - TransportListTasksAction listTasksAction) { + Supplier responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) { super(settings, actionName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, requestSupplier, responseSupplier, nodeExecutor); - this.jobManager = jobManager; this.processManager = processManager; - this.jobIdFromRequest = jobIdFromRequest; - this.listTasksAction = listTasksAction; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { - // the same validation that exists in AutodetectProcessManager#processData(...) and flush(...) methods - // is required here too because if the job hasn't been opened yet then no task exist for it yet and then - // #taskOperation(...) method will not be invoked, returning an empty result to the client. - // This ensures that we return an understandable error: - String jobId = jobIdFromRequest.apply(request); - jobManager.getJobOrThrowIfUnknown(jobId); - JobState jobState = jobManager.getJobState(jobId); - if (jobState != JobState.OPENED) { - listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + jobState + + String jobId = request.getJobId(); + // We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the + // node running the job task. + ClusterState state = clusterService.state(); + JobManager.getJobOrThrowIfUnknown(state, jobId); + PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress.PersistentTaskInProgress jobTask = MlMetadata.getJobTask(jobId, tasks); + if (jobTask == null || jobTask.getExecutorNode() == null) { + listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); } else { - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); - listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> { - String expectedDescription = "job-" + jobId; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedDescription.equals(taskInfo.getDescription())) { - request.setTaskId(taskInfo.getTaskId()); - super.doExecute(task, request, listener); - return; - } - } - listener.onFailure(new ResourceNotFoundException("task not found for job [" + jobId + "] " + listTasksResponse)); - }, listener::onFailure)); + request.setNodes(jobTask.getExecutorNode()); + super.doExecute(task, request, listener); } } + @Override + protected final void taskOperation(Request request, OperationTask task, ActionListener listener) { + PersistentTasksInProgress tasks = clusterService.state().metaData().custom(PersistentTasksInProgress.TYPE); + JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks); + if (jobState == JobState.OPENED) { + innerTaskOperation(request, task, listener); + } else { + listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState + + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); + } + } + + protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener listener); + @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java index 11799bf960e..1f1cfd61f61 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java @@ -24,18 +24,23 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; public class UpdateJobAction extends Action { public static final UpdateJobAction INSTANCE = new UpdateJobAction(); @@ -58,7 +63,7 @@ public class UpdateJobAction extends Action implements ToXContent { public static UpdateJobAction.Request parseRequest(String jobId, XContentParser parser) { - JobUpdate update = JobUpdate.PARSER.apply(parser, null); + JobUpdate update = JobUpdate.PARSER.apply(parser, null).build(); return new UpdateJobAction.Request(jobId, update); } @@ -132,6 +137,7 @@ public class UpdateJobAction extends Action { + private final ConcurrentMap semaphoreByJob = ConcurrentCollections.newConcurrentMap(); private final JobManager jobManager; private final Client client; @@ -162,14 +168,33 @@ public class UpdateJobAction extends Action wrappedListener = listener; - if (request.getJobUpdate().isAutodetectProcessUpdate()) { - wrappedListener = ActionListener.wrap( + PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE); + boolean jobIsOpen = MlMetadata.getJobState(request.getJobId(), tasks) == JobState.OPENED; + + semaphoreByJob.computeIfAbsent(request.getJobId(), id -> new Semaphore(1)).acquire(); + + ActionListener wrappedListener; + if (jobIsOpen && request.getJobUpdate().isAutodetectProcessUpdate()) { + wrappedListener = ActionListener.wrap( response -> updateProcess(request, response, listener), - listener::onFailure); + e -> { + releaseJobSemaphore(request.getJobId()); + listener.onFailure(e); + }); + } + else { + wrappedListener = ActionListener.wrap( + response -> { + releaseJobSemaphore(request.getJobId()); + listener.onResponse(response); + }, + e -> { + releaseJobSemaphore(request.getJobId()); + listener.onFailure(e); + }); } - jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener); + jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, client, wrappedListener); } private void updateProcess(Request request, PutJobAction.Response updateConfigResponse, @@ -177,19 +202,26 @@ public class UpdateJobAction extends Action() { @Override public void onResponse(UpdateProcessAction.Response response) { + releaseJobSemaphore(request.getJobId()); listener.onResponse(updateConfigResponse); } @Override public void onFailure(Exception e) { + releaseJobSemaphore(request.getJobId()); listener.onFailure(e); } }); } + private void releaseJobSemaphore(String jobId) { + semaphoreByJob.remove(jobId).release(); + } + @Override protected ClusterBlockException checkBlock(UpdateJobAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index 192cdb49ad2..1cb051ebc95 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -25,7 +24,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -37,7 +35,6 @@ import java.util.Objects; public class UpdateProcessAction extends Action { - public static final UpdateProcessAction INSTANCE = new UpdateProcessAction(); public static final String NAME = "cluster:admin/ml/job/update/process"; @@ -182,10 +179,9 @@ public class UpdateProcessAction extends @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager, AutodetectProcessManager processManager, TransportListTasksAction listTasksAction) { + AutodetectProcessManager processManager) { super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId, - listTasksAction); + Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager); } @Override @@ -196,18 +192,11 @@ public class UpdateProcessAction extends } @Override - protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { try { - if (request.getModelDebugConfig() != null) { - processManager.writeUpdateModelDebugMessage(request.getJobId(), request.getModelDebugConfig()); - } - if (request.getDetectorUpdates() != null) { - for (JobUpdate.DetectorUpdate update : request.getDetectorUpdates()) { - processManager.writeUpdateDetectorRulesMessage(request.getJobId(), update.getIndex(), update.getRules()); - } - } - + processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), + request.getModelDebugConfig()); listener.onResponse(new Response()); } catch (Exception e) { listener.onFailure(e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1d918d5f2f9..cd0cde07e01 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; @@ -150,7 +149,7 @@ public class JobManager extends AbstractComponent { * @throws org.elasticsearch.ResourceNotFoundException * if there is no job with matching the given {@code jobId} */ - Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { + public static Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); Job job = mlMetadata.getJobs().get(jobId); if (job == null) { @@ -198,7 +197,9 @@ public class JobManager extends AbstractComponent { }); } - public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener actionListener) { + public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, Client client, + ActionListener actionListener) { + clusterService.submitStateUpdateTask("update-job-" + jobId, new AckedClusterStateUpdateTask(request, actionListener) { private Job updatedJob; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 46dfe2f2c7a..5680251284d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; @@ -134,32 +135,11 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent private final String resultsIndexName; private final boolean deleted; - public Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime, + private Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelDebugConfig modelDebugConfig, Long renormalizationWindowDays, Long backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, String modelSnapshotId, String resultsIndexName, boolean deleted) { - if (analysisConfig == null) { - throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MISSING_ANALYSISCONFIG)); - } - - checkValueNotLessThan(0, "renormalizationWindowDays", renormalizationWindowDays); - checkValueNotLessThan(MIN_BACKGROUND_PERSIST_INTERVAL, "backgroundPersistInterval", backgroundPersistInterval); - checkValueNotLessThan(0, "modelSnapshotRetentionDays", modelSnapshotRetentionDays); - checkValueNotLessThan(0, "resultsRetentionDays", resultsRetentionDays); - - if (!MlStrings.isValidId(jobId)) { - throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), jobId)); - } - if (jobId.length() > MAX_JOB_ID_LENGTH) { - throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MAX_JOB_ID_LENGTH)); - } - - if (Strings.isNullOrEmpty(resultsIndexName)) { - resultsIndexName = jobId; - } else if (!MlStrings.isValidId(resultsIndexName)) { - throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName())); - } this.jobId = jobId; this.description = description; @@ -564,7 +544,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent } public void setAnalysisConfig(AnalysisConfig.Builder configBuilder) { - analysisConfig = configBuilder.build(); + analysisConfig = ExceptionsHelper.requireNonNull(configBuilder, ANALYSIS_CONFIG.getPreferredName()).build(); } public void setAnalysisLimits(AnalysisLimits analysisLimits) { @@ -597,7 +577,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent } public void setDataDescription(DataDescription.Builder description) { - dataDescription = description.build(); + dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); } public void setModelDebugConfig(ModelDebugConfig modelDebugConfig) { @@ -659,6 +639,28 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent modelSnapshotId = this.modelSnapshotId; } + if (analysisConfig == null) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MISSING_ANALYSISCONFIG)); + } + + checkValueNotLessThan(0, "renormalizationWindowDays", renormalizationWindowDays); + checkValueNotLessThan(MIN_BACKGROUND_PERSIST_INTERVAL, "backgroundPersistInterval", backgroundPersistInterval); + checkValueNotLessThan(0, "modelSnapshotRetentionDays", modelSnapshotRetentionDays); + checkValueNotLessThan(0, "resultsRetentionDays", resultsRetentionDays); + + if (!MlStrings.isValidId(id)) { + throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), id)); + } + if (id.length() > MAX_JOB_ID_LENGTH) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MAX_JOB_ID_LENGTH)); + } + + if (Strings.isNullOrEmpty(resultsIndexName)) { + resultsIndexName = id; + } else if (!MlStrings.isValidId(resultsIndexName)) { + throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName())); + } + return new Job( id, description, createTime, finishedTime, lastDataTime, analysisConfig, analysisLimits, dataDescription, modelDebugConfig, renormalizationWindowDays, backgroundPersistInterval, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java index b022bfdf9e8..07b59bb56cc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java @@ -23,25 +23,19 @@ import java.util.Objects; public class JobUpdate implements Writeable, ToXContent { public static final ParseField DETECTORS = new ParseField("detectors"); - @SuppressWarnings("unchecked") - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("job_update", a -> new JobUpdate((String) a[0], (List) a[1], - (ModelDebugConfig) a[2], (AnalysisLimits) a[3], (Long) a[4], (Long) a[5], (Long) a[6], (Long) a[7], - (List) a[8], (Map) a[9])); - + public static final ObjectParser PARSER = new ObjectParser<>("job_udpate", Builder::new); static { - PARSER.declareStringOrNull(ConstructingObjectParser.optionalConstructorArg(), Job.DESCRIPTION); - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), DetectorUpdate.PARSER, DETECTORS); - PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelDebugConfig.PARSER, Job.MODEL_DEBUG_CONFIG); - PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), AnalysisLimits.PARSER, Job.ANALYSIS_LIMITS); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.BACKGROUND_PERSIST_INTERVAL); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.RENORMALIZATION_WINDOW_DAYS); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.RESULTS_RETENTION_DAYS); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.MODEL_SNAPSHOT_RETENTION_DAYS); - PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), AnalysisConfig.CATEGORIZATION_FILTERS); - PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), Job.CUSTOM_SETTINGS, - ObjectParser.ValueType.OBJECT); + PARSER.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION); + PARSER.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS); + PARSER.declareObject(Builder::setModelDebugConfig, ModelDebugConfig.PARSER, Job.MODEL_DEBUG_CONFIG); + PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, Job.ANALYSIS_LIMITS); + PARSER.declareLong(Builder::setBackgroundPersistInterval, Job.BACKGROUND_PERSIST_INTERVAL); + PARSER.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS); + PARSER.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS); + PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS); + PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); + PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); } private final String description; @@ -55,11 +49,11 @@ public class JobUpdate implements Writeable, ToXContent { private final List categorizationFilters; private final Map customSettings; - public JobUpdate(@Nullable String description, @Nullable List detectorUpdates, - @Nullable ModelDebugConfig modelDebugConfig, @Nullable AnalysisLimits analysisLimits, - @Nullable Long backgroundPersistInterval, @Nullable Long renormalizationWindowDays, - @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, - @Nullable List categorisationFilters, @Nullable Map customSettings) { + private JobUpdate(@Nullable String description, @Nullable List detectorUpdates, + @Nullable ModelDebugConfig modelDebugConfig, @Nullable AnalysisLimits analysisLimits, + @Nullable Long backgroundPersistInterval, @Nullable Long renormalizationWindowDays, + @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, + @Nullable List categorisationFilters, @Nullable Map customSettings) { this.description = description; this.detectorUpdates = detectorUpdates; this.modelDebugConfig = modelDebugConfig; @@ -377,4 +371,74 @@ public class JobUpdate implements Writeable, ToXContent { && Objects.equals(this.rules, that.rules); } } + + public static class Builder { + private String description; + private List detectorUpdates; + private ModelDebugConfig modelDebugConfig; + private AnalysisLimits analysisLimits; + private Long renormalizationWindowDays; + private Long backgroundPersistInterval; + private Long modelSnapshotRetentionDays; + private Long resultsRetentionDays; + private List categorizationFilters; + private Map customSettings; + + public Builder() {} + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public Builder setDetectorUpdates(List detectorUpdates) { + this.detectorUpdates = detectorUpdates; + return this; + } + + public Builder setModelDebugConfig(ModelDebugConfig modelDebugConfig) { + this.modelDebugConfig = modelDebugConfig; + return this; + } + + public Builder setAnalysisLimits(AnalysisLimits analysisLimits) { + this.analysisLimits = analysisLimits; + return this; + } + + public Builder setRenormalizationWindowDays(Long renormalizationWindowDays) { + this.renormalizationWindowDays = renormalizationWindowDays; + return this; + } + + public Builder setBackgroundPersistInterval(Long backgroundPersistInterval) { + this.backgroundPersistInterval = backgroundPersistInterval; + return this; + } + + public Builder setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) { + this.modelSnapshotRetentionDays = modelSnapshotRetentionDays; + return this; + } + + public Builder setResultsRetentionDays(Long resultsRetentionDays) { + this.resultsRetentionDays = resultsRetentionDays; + return this; + } + + public Builder setCategorizationFilters(List categorizationFilters) { + this.categorizationFilters = categorizationFilters; + return this; + } + + public Builder setCustomSettings(Map customSettings) { + this.customSettings = customSettings; + return this; + } + + public JobUpdate build() { + return new JobUpdate(description, detectorUpdates, modelDebugConfig, analysisLimits, backgroundPersistInterval, + renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings); + } + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java index e876f8d5d08..cc72aa47c89 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java @@ -403,11 +403,13 @@ public class MlMetadata implements MetaData.Custom { public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) { PersistentTasksInProgress.PersistentTaskInProgress task = getJobTask(jobId, tasks); if (task != null && task.getStatus() != null) { - return (JobState) task.getStatus(); - } else { - // If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed - return JobState.CLOSED; + JobState jobTaskState = (JobState) task.getStatus(); + if (jobTaskState != null) { + return jobTaskState; + } } + // If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed + return JobState.CLOSED; } public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java index 62342124526..aca0a8f4ce7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; @@ -15,8 +16,13 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** @@ -30,6 +36,15 @@ public class NativeController { private static final String START_COMMAND = "start"; + public static final Map UNKNOWN_NATIVE_CODE_INFO; + + static { + Map unknownInfo = new HashMap<>(2); + unknownInfo.put("version", "N/A"); + unknownInfo.put("build_hash", "N/A"); + UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(unknownInfo); + } + private final CppLogMessageHandler cppLogHandler; private final OutputStream commandStream; private Thread logTailThread; @@ -59,6 +74,23 @@ public class NativeController { return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT); } + public Map getNativeCodeInfo() throws TimeoutException { + String copyrightMessage = cppLogHandler.getCppCopyright(CONTROLLER_CONNECT_TIMEOUT); + Matcher matcher = Pattern.compile("Version (.+) \\(Build ([0-9a-f]+)\\) Copyright ").matcher(copyrightMessage); + if (matcher.find()) { + Map info = new HashMap<>(2); + info.put("version", matcher.group(1)); + info.put("build_hash", matcher.group(2)); + return info; + } else { + // If this happens it probably means someone has changed the format in lib/ver/CBuildInfo.cc + // in the machine-learning-cpp repo without changing the pattern above to match + String msg = "Unexpected native controller process copyright format: " + copyrightMessage; + LOGGER.error(msg); + throw new ElasticsearchException(msg); + } + } + public void startProcess(List command) throws IOException { // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process for (String arg : command) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java new file mode 100644 index 00000000000..b67c0a9fcc2 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; + +import java.io.IOException; + +/** + * Manages a singleton NativeController so that both the MachineLearningFeatureSet and MachineLearning classes can + * get access to the same one. + */ +public class NativeControllerHolder { + + private static final Object lock = new Object(); + private static NativeController nativeController; + + private NativeControllerHolder() { + } + + /** + * Get a reference to the singleton native process controller. + * + * The NativeController is created lazily to allow time for the C++ process to be started before connection is attempted. + * + * null is returned to tests that haven't bothered to set up path.home and all runs where useNativeProcess=false. + * + * Calls may throw an exception if initial connection to the C++ process fails. + */ + public static NativeController getNativeController(Settings settings) throws IOException { + + if (Environment.PATH_HOME_SETTING.exists(settings) && MachineLearning.USE_NATIVE_PROCESS_OPTION.get(settings)) { + synchronized (lock) { + if (nativeController == null) { + nativeController = new NativeController(new Environment(settings), new NamedPipeHelper()); + nativeController.tailLogsInThread(); + } + } + return nativeController; + } + return null; + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 0bffeda4283..23015dae064 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -35,7 +35,8 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -51,7 +52,7 @@ public class AutodetectCommunicator implements Closeable { private final AutoDetectResultProcessor autoDetectResultProcessor; private final Consumer handler; - final AtomicBoolean inUse = new AtomicBoolean(false); + final AtomicReference inUse = new AtomicReference<>(); public AutodetectCommunicator(long taskId, Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler) { @@ -83,7 +84,7 @@ public class AutodetectCommunicator implements Closeable { DataCounts results = autoDetectWriter.write(countingStream); autoDetectWriter.flush(); return results; - }); + }, false); } @Override @@ -98,21 +99,22 @@ public class AutodetectCommunicator implements Closeable { autoDetectResultProcessor.awaitCompletion(); handler.accept(errorReason != null ? new ElasticsearchException(errorReason) : null); return null; - }); + }, true); } + public void writeUpdateModelDebugMessage(ModelDebugConfig config) throws IOException { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { autodetectProcess.writeUpdateModelDebugMessage(config); return null; - }); + }, false); } public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules); return null; - }); + }, false); } public void flushJob(InterimResultsParams params) throws IOException { @@ -120,7 +122,7 @@ public class AutodetectCommunicator implements Closeable { String flushId = autodetectProcess.flushJob(params); waitFlushToCompletion(flushId); return null; - }); + }, false); } private void waitFlushToCompletion(String flushId) throws IOException { @@ -171,16 +173,32 @@ public class AutodetectCommunicator implements Closeable { return taskId; } - private T checkAndRun(Supplier errorMessage, CheckedSupplier callback) throws IOException { - if (inUse.compareAndSet(false, true)) { + private T checkAndRun(Supplier errorMessage, CheckedSupplier callback, boolean wait) throws IOException { + CountDownLatch latch = new CountDownLatch(1); + if (inUse.compareAndSet(null, latch)) { try { checkProcessIsAlive(); return callback.get(); } finally { - inUse.set(false); + latch.countDown(); + inUse.set(null); } } else { - throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + if (wait) { + latch = inUse.get(); + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + } + } + checkProcessIsAlive(); + return callback.get(); + } else { + throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 467433a9585..7fba2cf459c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -19,9 +19,9 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -116,15 +116,9 @@ public class AutodetectProcessManager extends AbstractComponent { * @return Count of records, fields, bytes, etc written */ public DataCounts processData(String jobId, InputStream input, DataLoadParams params) { - JobState jobState = jobManager.getJobState(jobId); - if (jobState != JobState.OPENED) { - throw new IllegalArgumentException("job [" + jobId + "] state is [" + jobState + "], but must be [" - + JobState.OPENED + "] for processing data"); - } - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - throw new IllegalStateException("job [" + jobId + "] with state [" + jobState + "] hasn't been started"); + throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job"); } try { return communicator.writeToJob(input, params); @@ -168,23 +162,25 @@ public class AutodetectProcessManager extends AbstractComponent { } } - public void writeUpdateModelDebugMessage(String jobId, ModelDebugConfig config) throws IOException { + public void writeUpdateProcessMessage(String jobId, List updates, ModelDebugConfig config) + throws IOException { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); return; } - communicator.writeUpdateModelDebugMessage(config); - // TODO check for errors from autodetects - } - public void writeUpdateDetectorRulesMessage(String jobId, int detectorIndex, List rules) throws IOException { - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); - if (communicator == null) { - logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); - return; + if (config != null) { + communicator.writeUpdateModelDebugMessage(config); + } + + if (updates != null) { + for (JobUpdate.DetectorUpdate update : updates) { + if (update.getRules() != null) { + communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules()); + } + } } - communicator.writeUpdateDetectorRulesMessage(detectorIndex, rules); // TODO check for errors from autodetects } @@ -204,7 +200,10 @@ public class AutodetectProcessManager extends AbstractComponent { } setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1)); } - }, handler); + }, e1 -> { + logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); + setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1)); + }); } // TODO: add a method on JobProvider that fetches all required info via 1 msearch call, so that we have a single lambda diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index d3376adaa4d..24d6269896d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import java.io.IOException; @@ -80,7 +81,7 @@ public class StateProcessor extends AbstractComponent { try { logger.trace("[{}] ES API CALL: bulk index", jobId); BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(bytes, null, null); + bulkRequest.add(bytes, null, null, XContentType.JSON); client.bulk(bulkRequest).actionGet(); } catch (Exception e) { logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java index ef78ce53d09..3b1756fcdac 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java @@ -47,6 +47,7 @@ public class CppLogMessageHandler implements Closeable { private final int errorStoreSize; private final Deque errorStore; private final CountDownLatch pidLatch; + private final CountDownLatch cppCopyrightLatch; private volatile boolean hasLogStreamEnded; private volatile boolean seenFatalError; private volatile long pid; @@ -70,6 +71,7 @@ public class CppLogMessageHandler implements Closeable { this.errorStoreSize = errorStoreSize; errorStore = ConcurrentCollections.newDeque(); pidLatch = new CountDownLatch(1); + cppCopyrightLatch = new CountDownLatch(1); hasLogStreamEnded = false; } @@ -133,7 +135,23 @@ public class CppLogMessageHandler implements Closeable { return pid; } - public String getCppCopyright() { + /** + * Get the process ID of the C++ process whose log messages are being read. This will + * arrive in the first log message logged by the C++ process. They all log a copyright + * message immediately on startup so it should not take long to arrive, but will not be + * available instantly after the process starts. + */ + public String getCppCopyright(Duration timeout) throws TimeoutException { + if (cppCopyright == null) { + try { + cppCopyrightLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (cppCopyright == null) { + throw new TimeoutException("Timed out waiting for C++ process copyright"); + } + } return cppCopyright; } @@ -193,6 +211,7 @@ public class CppLogMessageHandler implements Closeable { String latestMessage = msg.getMessage(); if (cppCopyright == null && latestMessage.contains("Copyright")) { cppCopyright = latestMessage; + cppCopyrightLatch.countDown(); } // TODO: Is there a way to preserve the original timestamp when re-logging? if (jobId != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java index dd60318c0a6..d079cca978b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java @@ -37,4 +37,9 @@ public class RestPostDataAction extends BaseRestHandler { return channel -> client.execute(PostDataAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); } + + @Override + public boolean supportsContentStream() { + return true; + } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java index a3eaeddcdf2..0e6aa22a5c6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java @@ -54,6 +54,11 @@ public class MonitoringFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled(), exportersUsage(exporters)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java index e430d35ba02..bf5be288335 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java @@ -57,4 +57,5 @@ public class PersistentTask extends CancellableTask { public void setPersistentTaskId(long persistentTaskId) { this.persistentTaskId = persistentTaskId; } + } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java index 436206bdf9c..a94486e57f0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java @@ -93,6 +93,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable task, boolean stopped, String newExecutorNode) { this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status, - newExecutorNode); + newExecutorNode, task.allocationId); } public PersistentTaskInProgress(PersistentTaskInProgress task, Status status) { - this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode); + this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, + task.executorNode, task.allocationId); } private PersistentTaskInProgress(long id, long allocationId, String action, Request request, - boolean stopped, boolean removeOnCompletion, Status status, String executorNode) { + boolean stopped, boolean removeOnCompletion, Status status, + String executorNode, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.action = action; @@ -220,6 +226,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable setId(long id) { this.id = id; @@ -394,8 +417,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { + this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; + return this; + } + public PersistentTaskInProgress build() { - return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode); + return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, + executorNode, allocationIdOnLastStatusUpdate); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java index 8ba96c5c166..44d531d4287 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java @@ -83,6 +83,11 @@ public class SecurityFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { Map realmsUsage = buildRealmsUsage(realms); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/action/role/PutRoleRequestBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/security/action/role/PutRoleRequestBuilder.java index b6eff99606d..8b9901e99b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/action/role/PutRoleRequestBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/action/role/PutRoleRequestBuilder.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.security.authz.RoleDescriptor; @@ -31,15 +30,6 @@ public class PutRoleRequestBuilder extends ActionRequestBuilder nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled(), watcherService != null ? watcherService.usageStats() : Collections.emptyMap()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/ExecuteWatchRequest.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/ExecuteWatchRequest.java index b0e16965a61..d81acf90213 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/ExecuteWatchRequest.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/ExecuteWatchRequest.java @@ -141,16 +141,6 @@ public class ExecuteWatchRequest extends MasterNodeReadRequest { this(id, source.buildAsBytes(XContentType.JSON), XContentType.JSON); } - @Deprecated - public PutWatchRequest(String id, BytesReference source) { - this(id, source, source != null ? XContentFactory.xContentType(source) : null); - } - public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { this.id = id; this.source = source; @@ -81,16 +76,6 @@ public class PutWatchRequest extends MasterNodeRequest { setSource(source.buildAsBytes(XContentType.JSON), XContentType.JSON); } - /** - * Set the source of the watch - * @deprecated use {@link #setSource(BytesReference, XContentType)} - */ - @Deprecated - public void setSource(BytesReference source) { - this.source = source; - this.xContentType = XContentFactory.xContentType(source); - } - /** * Set the source of the watch */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/PutWatchRequestBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/PutWatchRequestBuilder.java index cc5eb158d31..3f33de2e05d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/PutWatchRequestBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/PutWatchRequestBuilder.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.transport.actions.put; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder; public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder { @@ -31,9 +32,10 @@ public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder listener = new PlainListenableActionFuture<>(client.threadPool()); - new MachineLearningClient(client).closeJob(new CloseJobAction.Request("foo"), listener); + CloseJobAction.Request request = new CloseJobAction.Request("foo"); + request.setTimeout(TimeValue.timeValueSeconds(30)); + new MachineLearningClient(client).closeJob(request, listener); listener.actionGet(); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java index 42ca1ecebec..e47907d7e98 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.util.Collections; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; public class CloseJobActionTests extends ESTestCase { @@ -26,9 +27,7 @@ public class CloseJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null); - task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED)); - + createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED)); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); @@ -52,10 +51,7 @@ public class CloseJobActionTests extends ESTestCase { public void testMoveJobToClosingState_unexpectedJobState() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null); - task = new PersistentTaskInProgress<>(task, JobState.OPENING); - + PersistentTaskInProgress task = createJobTask(1L, "job_id", null, JobState.OPENING); ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobsActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobsActionResponseTests.java index dbbbe428e0f..36f8cee3d77 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobsActionResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobsActionResponseTests.java @@ -36,10 +36,10 @@ public class GetJobsActionResponseTests extends AbstractStreamableTestCase customConfig = randomBoolean() ? Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)) : null; String modelSnapshotId = randomBoolean() ? randomAsciiOfLength(10) : null; - String indexName = randomBoolean() ? "index" + j : null; - Job job = new Job(jobId, description, createTime, finishedTime, lastDataTime, - analysisConfig, analysisLimits, dataDescription, - modelDebugConfig, normalizationWindowDays, backgroundPersistInterval, - modelSnapshotRetentionDays, resultsRetentionDays, customConfig, modelSnapshotId, indexName, randomBoolean()); + String indexName = "index" + j; + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + builder.setDescription(description); + builder.setCreateTime(createTime); + builder.setFinishedTime(finishedTime); + builder.setLastDataTime(lastDataTime); + builder.setAnalysisConfig(analysisConfig); + builder.setAnalysisLimits(analysisLimits); + builder.setDataDescription(dataDescription); + builder.setModelDebugConfig(modelDebugConfig); + builder.setRenormalizationWindowDays(normalizationWindowDays); + builder.setBackgroundPersistInterval(backgroundPersistInterval); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); + builder.setResultsRetentionDays(resultsRetentionDays); + builder.setCustomSettings(customConfig); + builder.setModelSnapshotId(modelSnapshotId); + builder.setResultsIndexName(indexName); + builder.setDeleted(randomBoolean()); + Job job = builder.build(); jobList.add(job); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index f5dcf6f9f12..fc52d040940 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -41,16 +41,14 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id"); - task = new PersistentTaskInProgress<>(task, randomFrom(JobState.CLOSED, JobState.FAILED)); + createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes); OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); - task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id"); - task = new PersistentTaskInProgress<>(task, JobState.OPENED); + task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); } @@ -79,19 +77,16 @@ public class OpenJobActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id"); JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING); - task = new PersistentTaskInProgress<>(task, jobState); + PersistentTaskInProgress task = createJobTask(1L, "job_id", "_node_id", jobState); PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); Exception e = expectThrows(ElasticsearchStatusException.class, () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes)); assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage()); - task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id"); jobState = randomFrom(JobState.OPENING, JobState.CLOSING); - task = new PersistentTaskInProgress<>(task, jobState); + task = createJobTask(1L, "job_id", "_other_node_id", jobState); PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); e = expectThrows(ElasticsearchStatusException.class, @@ -124,7 +119,7 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger); assertEquals("_node_id3", result.getId()); } @@ -152,7 +147,7 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); assertNull(result); } @@ -174,8 +169,71 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); assertNull(result); } + public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); + nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + Map> taskMap = new HashMap<>(); + taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING)); + taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING)); + taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING)); + taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING)); + taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING)); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs.build(), 2, logger); + assertEquals("_node_id3", result.getId()); + + PersistentTaskInProgress lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING); + taskMap.put(5L, lastTask); + tasks = new PersistentTasksInProgress(6L, taskMap); + + cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + assertNull("no node selected, because OPENING state", result); + + taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3")); + tasks = new PersistentTasksInProgress(6L, taskMap); + + cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + assertNull("no node selected, because stale task", result); + + taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null)); + tasks = new PersistentTasksInProgress(6L, taskMap); + + cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + assertNull("no node selected, because null state", result); + } + + public static PersistentTaskInProgress createJobTask(long id, String jobId, String nodeId, JobState jobState) { + PersistentTaskInProgress task = + new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId); + task = new PersistentTaskInProgress<>(task, jobState); + return task; + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 0ada25abd2d..7cf26b471fd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.hamcrest.Matchers.equalTo; @@ -41,10 +42,8 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), false, true, "node_id"); - task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED, - JobState.CLOSING, JobState.OPENING)); + JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING); + PersistentTaskInProgress task = createJobTask(0L, job.getId(), "node_id", jobState); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); DiscoveryNodes nodes = DiscoveryNodes.builder() @@ -61,7 +60,7 @@ public class StartDatafeedActionTests extends ESTestCase { DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build()); assertNull(node); - task = new PersistentTaskInProgress<>(task, JobState.OPENED); + task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) @@ -110,10 +109,7 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress jobTask = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), - false, true, "node_id"); - jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); + PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, "node_id"); @@ -140,10 +136,7 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress jobTask = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), - false, true, "node_id2"); - jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); + PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, "node_id1"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index d2cf6c3cfdc..b2c12c7b0fd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -22,10 +22,16 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; @@ -155,7 +161,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); @@ -165,29 +171,28 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(JobState.OPENED, task.getStatus()); }); - // stop the only running ml node - logger.info("!!!!"); + logger.info("stop the only running ml node"); internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); ensureStableCluster(2); assertBusy(() -> { // job should get and remain in a failed state: ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); assertNull(task.getExecutorNode()); // The status remains to be opened as from ml we didn't had the chance to set the status to failed: assertEquals(JobState.OPENED, task.getStatus()); }); - // start ml node + logger.info("start ml node"); internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); ensureStableCluster(3); assertBusy(() -> { // job should be re-opened: ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); assertNotNull(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); @@ -197,6 +202,114 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); }); + cleanupWorkaround(3); } + public void testMaxConcurrentJobAllocations() throws Exception { + int numMlNodes = 2; + internalCluster().ensureAtMostNumDataNodes(0); + // start non ml node, but that will hold the indices + logger.info("Start non ml node:"); + String nonMlNode = internalCluster().startNode(Settings.builder() + .put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + logger.info("Starting ml nodes"); + internalCluster().startNodes(numMlNodes, Settings.builder() + .put("node.data", false) + .put("node.master", false) + .put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); + ensureStableCluster(numMlNodes + 1); + + int maxConcurrentJobAllocations = randomIntBetween(1, 4); + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), maxConcurrentJobAllocations)) + .get(); + + // Sample each cs update and keep track each time a node holds more than `maxConcurrentJobAllocations` opening jobs. + List violations = new CopyOnWriteArrayList<>(); + internalCluster().clusterService(nonMlNode).addListener(event -> { + PersistentTasksInProgress tasks = event.state().metaData().custom(PersistentTasksInProgress.TYPE); + if (tasks == null) { + return; + } + + for (DiscoveryNode node : event.state().nodes()) { + Collection> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> { + return node.getId().equals(task.getExecutorNode()) && + (task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false); + }); + int count = foundTasks.size(); + if (count > maxConcurrentJobAllocations) { + violations.add("Observed node [" + node.getName() + "] with [" + count + "] opening jobs on cluster state version [" + + event.state().version() + "]"); + } + } + }); + + int numJobs = numMlNodes * 10; + for (int i = 0; i < numJobs; i++) { + Job.Builder job = createJob(Integer.toString(i)); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); + } + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(numJobs, tasks.taskMap().size()); + for (PersistentTaskInProgress task : tasks.taskMap().values()) { + assertNotNull(task.getExecutorNode()); + assertEquals(JobState.OPENED, task.getStatus()); + } + }); + + logger.info("stopping ml nodes"); + for (int i = 0; i < numMlNodes; i++) { + // fork so stopping all ml nodes proceeds quicker: + Runnable r = () -> { + try { + internalCluster() + .stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + } catch (IOException e) { + logger.error("error stopping node", e); + } + }; + new Thread(r).start(); + } + ensureStableCluster(1, nonMlNode); + assertBusy(() -> { + ClusterState state = client(nonMlNode).admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(numJobs, tasks.taskMap().size()); + for (PersistentTaskInProgress task : tasks.taskMap().values()) { + assertNull(task.getExecutorNode()); + } + }); + + logger.info("re-starting ml nodes"); + internalCluster().startNodes(numMlNodes, Settings.builder() + .put("node.data", false) + .put("node.master", false) + .put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); + + ensureStableCluster(1 + numMlNodes); + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(numJobs, tasks.taskMap().size()); + for (PersistentTaskInProgress task : tasks.taskMap().values()) { + assertNotNull(task.getExecutorNode()); + assertEquals(JobState.OPENED, task.getStatus()); + } + }, 30, TimeUnit.SECONDS); + + assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size()); + cleanupWorkaround(numMlNodes + 1); + } + + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index dcb820efe27..eb43e5ab3a3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -52,7 +53,8 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "airline-data-empty", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "airline-data-empty", Collections.emptyMap(), + new StringEntity(mappings, ContentType.APPLICATION_JSON)); // Create index with source = enabled, doc_values = enabled, stored = false mappings = "{" @@ -66,12 +68,14 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}", + ContentType.APPLICATION_JSON)); // Create index with source = enabled, doc_values = disabled (except time), stored = false mappings = "{" @@ -85,12 +89,15 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(), + new StringEntity(mappings, ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-disabled-doc-values/response/1", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-disabled-doc-values/response/2", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}", + ContentType.APPLICATION_JSON)); // Create index with source = disabled, doc_values = enabled (except time), stored = true mappings = "{" @@ -105,12 +112,15 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(), + new StringEntity(mappings, ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-disabled-source/response/1", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-disabled-source/response/2", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}", + ContentType.APPLICATION_JSON)); // Create index with nested documents mappings = "{" @@ -122,12 +132,14 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON)); client().performRequest("put", "nested-data/response/1", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}")); + new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "nested-data/response/2", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}")); + new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}", + ContentType.APPLICATION_JSON)); // Create index with multiple docs per time interval for aggregation testing mappings = "{" @@ -141,24 +153,33 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "airline-data-aggs", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "airline-data-aggs", Collections.emptyMap(), + new StringEntity(mappings, ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/1", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/2", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/3", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/4", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/5", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/6", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/7", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data-aggs/response/8", Collections.emptyMap(), - new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}")); + new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}", + ContentType.APPLICATION_JSON)); // Ensure all data is searchable client().performRequest("post", "_refresh"); @@ -210,7 +231,7 @@ public class DatafeedJobIT extends ESRestTestCase { + "\"data_description\" : {\"time_field\":\"time stamp\"}" + "}"; client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), - new StringEntity(job)); + new StringEntity(job, ContentType.APPLICATION_JSON)); String datafeedId = "datafeed-" + jobId; String aggregations = "{\"time stamp\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," @@ -235,7 +256,7 @@ public class DatafeedJobIT extends ESRestTestCase { + "\"data_description\" : {\"time_field\":\"time stamp\"}" + "}"; client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), - new StringEntity(job)); + new StringEntity(job, ContentType.APPLICATION_JSON)); String datafeedId = "datafeed-" + jobId; String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"}," @@ -398,9 +419,8 @@ public class DatafeedJobIT extends ESRestTestCase { + " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n" + " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + "}"; - return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + id, - Collections.emptyMap(), new StringEntity(job)); + Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON)); } private static String responseEntityToString(Response response) throws Exception { @@ -419,7 +439,7 @@ public class DatafeedJobIT extends ESRestTestCase { + "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}" + "}"; client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), - new StringEntity(job)); + new StringEntity(job, ContentType.APPLICATION_JSON)); String datafeedId = jobId + "-datafeed"; new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").setSource(source).build(); @@ -478,7 +498,7 @@ public class DatafeedJobIT extends ESRestTestCase { + (aggregations == null ? "" : ",\"aggs\":" + aggregations) + "}"; return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(), - new StringEntity(datafeedConfig)); + new StringEntity(datafeedConfig, ContentType.APPLICATION_JSON)); } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index f20ec8dcd29..2fdca2d2999 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -139,7 +140,7 @@ public class MlJobIT extends ESRestTestCase { + " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" + " }\n" + "}"; return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, - Collections.emptyMap(), new StringEntity(job)); + Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON)); } public void testGetBucketResults() throws Exception { @@ -216,13 +217,15 @@ public class MlJobIT extends ESRestTestCase { String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1"); Response response = client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig)); + + "anomaly_detectors/repeated-id" , Collections.emptyMap(), + new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); assertEquals(200, response.getStatusLine().getStatusCode()); final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2"); ResponseException e = expectThrows(ResponseException.class, () ->client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig2))); + + "anomaly_detectors/repeated-id" , Collections.emptyMap(), + new StringEntity(jobConfig2, ContentType.APPLICATION_JSON))); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); assertThat(e.getMessage(), containsString("The job cannot be created with the Id 'repeated-id'. The Id is already used.")); @@ -240,12 +243,12 @@ public class MlJobIT extends ESRestTestCase { String jobConfig = String.format(Locale.ROOT, jobTemplate, indexName); Response response = client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig)); + + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); assertEquals(200, response.getStatusLine().getStatusCode()); String jobId2 = "aliased-job-2"; response = client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig)); + + "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); assertEquals(200, response.getStatusLine().getStatusCode()); response = client().performRequest("get", "_aliases"); @@ -307,7 +310,7 @@ public class MlJobIT extends ESRestTestCase { String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1); Response response = client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig)); + + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); assertEquals(200, response.getStatusLine().getStatusCode()); // Check the index mapping contains the first by_field_name @@ -319,7 +322,7 @@ public class MlJobIT extends ESRestTestCase { jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2); response = client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig)); + + "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); assertEquals(200, response.getStatusLine().getStatusCode()); // Check the index mapping now contains both fields @@ -406,11 +409,11 @@ public class MlJobIT extends ESRestTestCase { "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}", jobId, 123, 1, 1); client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + 123, - Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/result/" + 123, - Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/result/" + 123, - Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); client().performRequest("post", "_refresh"); @@ -461,7 +464,7 @@ public class MlJobIT extends ESRestTestCase { private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception { try { client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId), - Collections.emptyMap(), new StringEntity(RESULT_MAPPING)); + Collections.emptyMap(), new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON)); } catch (ResponseException e) { // it is ok: the index already exists assertThat(e.getMessage(), containsString("resource_already_exists_exception")); @@ -474,13 +477,13 @@ public class MlJobIT extends ESRestTestCase { String id = String.format(Locale.ROOT, "%s_%s_%s", jobId, timestamp, bucketSpan); return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + id, - Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult)); + Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult, ContentType.APPLICATION_JSON)); } private Response addRecordResult(String jobId, String timestamp, long bucketSpan, int sequenceNum) throws Exception { try { client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId), Collections.emptyMap(), - new StringEntity(RESULT_MAPPING)); + new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON)); } catch (ResponseException e) { // it is ok: the index already exists assertThat(e.getMessage(), containsString("resource_already_exists_exception")); @@ -492,7 +495,7 @@ public class MlJobIT extends ESRestTestCase { "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}", jobId, timestamp, bucketSpan, sequenceNum); return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + timestamp, - Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); } private static String responseEntityToString(Response response) throws Exception { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 31702ef18f0..acbc1397377 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -72,7 +72,6 @@ public class MlRestTestStateCleaner { try { client.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close"); } catch (Exception e) { - logger.info("Test clean up close"); if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]") || e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) { logger.debug("job [" + jobId + "] has already been closed", e); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 77957f1dfe6..e0e7f376bf3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -66,19 +66,17 @@ public class JobManagerTests extends ESTestCase { } public void testGetJobOrThrowIfUnknown_GivenUnknownJob() { - JobManager jobManager = createJobManager(); ClusterState cs = createClusterState(); - ESTestCase.expectThrows(ResourceNotFoundException.class, () -> jobManager.getJobOrThrowIfUnknown(cs, "foo")); + ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown(cs, "foo")); } public void testGetJobOrThrowIfUnknown_GivenKnownJob() { - JobManager jobManager = createJobManager(); Job job = buildJobBuilder("foo").build(); MlMetadata mlMetadata = new MlMetadata.Builder().putJob(job, false).build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)).build(); - assertEquals(job, jobManager.getJobOrThrowIfUnknown(cs, "foo")); + assertEquals(job, JobManager.getJobOrThrowIfUnknown(cs, "foo")); } public void testGetJob_GivenJobIdIsAll() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index b3d46ef5be6..e8165f3b8d5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java index 475abdf30a8..bf85df60188 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java @@ -22,12 +22,12 @@ public class JobUpdateTests extends AbstractSerializingTestCase { @Override protected JobUpdate createTestInstance() { - String description = null; + JobUpdate.Builder update = new JobUpdate.Builder(); if (randomBoolean()) { - description = randomAsciiOfLength(20); + update.setDescription(randomAsciiOfLength(20)); } - List detectorUpdates = null; if (randomBoolean()) { + List detectorUpdates = null; int size = randomInt(10); detectorUpdates = new ArrayList<>(size); for (int i = 0; i < size; i++) { @@ -44,42 +44,34 @@ public class JobUpdateTests extends AbstractSerializingTestCase { } detectorUpdates.add(new JobUpdate.DetectorUpdate(i, detectorDescription, detectionRules)); } + update.setDetectorUpdates(detectorUpdates); } - ModelDebugConfig modelDebugConfig = null; if (randomBoolean()) { - modelDebugConfig = new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10)); + update.setModelDebugConfig(new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10))); } - AnalysisLimits analysisLimits = null; if (randomBoolean()) { - analysisLimits = new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong()); + update.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong())); } - Long renormalizationWindowDays = null; if (randomBoolean()) { - renormalizationWindowDays = randomNonNegativeLong(); + update.setRenormalizationWindowDays(randomNonNegativeLong()); } - Long backgroundPersistInterval = null; if (randomBoolean()) { - backgroundPersistInterval = randomNonNegativeLong(); + update.setBackgroundPersistInterval(randomNonNegativeLong()); } - Long modelSnapshotRetentionDays = null; if (randomBoolean()) { - modelSnapshotRetentionDays = randomNonNegativeLong(); + update.setModelSnapshotRetentionDays(randomNonNegativeLong()); } - Long resultsRetentionDays = null; if (randomBoolean()) { - resultsRetentionDays = randomNonNegativeLong(); + update.setResultsRetentionDays(randomNonNegativeLong()); } - List categorizationFilters = null; if (randomBoolean()) { - categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false)); + update.setCategorizationFilters(Arrays.asList(generateRandomStringArray(10, 10, false))); } - Map customSettings = null; if (randomBoolean()) { - customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)); + update.setCustomSettings(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); } - return new JobUpdate(description, detectorUpdates, modelDebugConfig, analysisLimits, backgroundPersistInterval, - renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings); + return update.build(); } @Override @@ -89,7 +81,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase { @Override protected JobUpdate parseInstance(XContentParser parser) { - return JobUpdate.PARSER.apply(parser, null); + return JobUpdate.PARSER.apply(parser, null).build(); } public void testMergeWithJob() { @@ -108,9 +100,18 @@ public class JobUpdateTests extends AbstractSerializingTestCase { List categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false)); Map customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)); - JobUpdate update = new JobUpdate("updated_description", detectorUpdates, modelDebugConfig, - analysisLimits, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - categorizationFilters, customSettings); + JobUpdate.Builder updateBuilder = new JobUpdate.Builder(); + updateBuilder.setDescription("updated_description"); + updateBuilder.setDetectorUpdates(detectorUpdates); + updateBuilder.setModelDebugConfig(modelDebugConfig); + updateBuilder.setAnalysisLimits(analysisLimits); + updateBuilder.setBackgroundPersistInterval(randomNonNegativeLong()); + updateBuilder.setResultsRetentionDays(randomNonNegativeLong()); + updateBuilder.setModelSnapshotRetentionDays(randomNonNegativeLong()); + updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong()); + updateBuilder.setCategorizationFilters(categorizationFilters); + updateBuilder.setCustomSettings(customSettings); + JobUpdate update = updateBuilder.build(); Job.Builder jobBuilder = new Job.Builder("foo"); Detector.Builder d1 = new Detector.Builder("info_content", "domain"); @@ -143,11 +144,11 @@ public class JobUpdateTests extends AbstractSerializingTestCase { } public void testIsAutodetectProcessUpdate() { - JobUpdate update = new JobUpdate(null, null, null, null, null, null, null, null, null, null); + JobUpdate update = new JobUpdate.Builder().build(); assertFalse(update.isAutodetectProcessUpdate()); - update = new JobUpdate(null, null, new ModelDebugConfig(1.0, "ff"), null, null, null, null, null, null, null); + update = new JobUpdate.Builder().setModelDebugConfig(new ModelDebugConfig(1.0, "ff")).build(); assertTrue(update.isAutodetectProcessUpdate()); - update = new JobUpdate(null, Arrays.asList(mock(JobUpdate.DetectorUpdate.class)), null, null, null, null, null, null, null, null); + update = new JobUpdate.Builder().setDetectorUpdates(Arrays.asList(mock(JobUpdate.DetectorUpdate.class))).build(); assertTrue(update.isAutodetectProcessUpdate()); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 46271c52ce5..36248305f84 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.io.IOException; import java.util.Collections; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; @@ -146,11 +147,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getDatafeeds().get("1"), nullValue()); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>( - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("1"), false, true, null), - JobState.CLOSED - ); + PersistentTaskInProgress task = createJobTask(0L, "1", null, JobState.CLOSED); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.deleteJob("1", new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)))); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java index 211d995fd2d..142b4fabcca 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java @@ -18,6 +18,8 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; import static org.mockito.Matchers.any; import static org.mockito.Matchers.contains; @@ -25,9 +27,9 @@ import static org.mockito.Mockito.when; public class NativeControllerTests extends ESTestCase { - public void testNativeController() throws IOException { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); - Environment env = new Environment(settings); + private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + + public void testStartProcessCommand() throws IOException { NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]); @@ -43,10 +45,34 @@ public class NativeControllerTests extends ESTestCase { command.add("--arg2=42"); command.add("--arg3=something with spaces"); - NativeController nativeController = new NativeController(env, namedPipeHelper); + NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); nativeController.startProcess(command); assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", commandStream.toString(StandardCharsets.UTF_8.name())); } + + public void testGetNativeCodeInfo() throws IOException, TimeoutException { + + String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," + + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; + + NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); + ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) + .thenReturn(logStream); + ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) + .thenReturn(commandStream); + + NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); + nativeController.tailLogsInThread(); + Map nativeCodeInfo = nativeController.getNativeCodeInfo(); + + assertNotNull(nativeCodeInfo); + assertEquals(2, nativeCodeInfo.size()); + assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version")); + assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash")); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index cb7996f3ede..3f389eba20d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -28,6 +28,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import static org.elasticsearch.mock.orig.Mockito.doAnswer; @@ -155,11 +156,11 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class))); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), Optional.empty())); } @@ -169,11 +170,11 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); InterimResultsParams params = mock(InterimResultsParams.class); expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params)); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.flushJob(params); } @@ -183,10 +184,12 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.inUse.set(true); - expectThrows(ElasticsearchStatusException.class, () -> communicator.close()); + CountDownLatch latch = mock(CountDownLatch.class); + communicator.inUse.set(latch); + communicator.close(); + verify(latch, times(1)).await(); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.close(); } @@ -195,10 +198,10 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class))); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class)); } @@ -208,10 +211,10 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); List rules = Collections.singletonList(mock(DetectionRule.class)); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules)); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.writeUpdateDetectorRulesMessage(0, rules); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index a15334aefb1..b6c30195cf5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -93,7 +94,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); - givenAllocationWithState(JobState.OPENED); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); doAnswer(invocationOnMock -> { @@ -131,7 +131,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { manager.openJob("foo", 1L, false, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); - UpdatePersistentTaskStatusAction.Request expectedRequest = new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED); + UpdatePersistentTaskStatusAction.Request expectedRequest = + new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED); verify(client).execute(eq(UpdatePersistentTaskStatusAction.INSTANCE), eq(expectedRequest), any()); } @@ -270,19 +271,14 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals("[foo] exception while flushing job", e.getMessage()); } - public void testWriteUpdateModelDebugMessage() throws IOException { + public void testwriteUpdateProcessMessage() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); ModelDebugConfig debugConfig = mock(ModelDebugConfig.class); - manager.writeUpdateModelDebugMessage("foo", debugConfig); - verify(communicator).writeUpdateModelDebugMessage(debugConfig); - } - - public void testWriteUpdateDetectorRulesMessage() throws IOException { - AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); List rules = Collections.singletonList(mock(DetectionRule.class)); - manager.writeUpdateDetectorRulesMessage("foo", 2, rules); + List detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules)); + manager.writeUpdateProcessMessage("foo", detectorUpdates, debugConfig); + verify(communicator).writeUpdateModelDebugMessage(debugConfig); verify(communicator).writeUpdateDetectorRulesMessage(2, rules); } @@ -302,7 +298,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo")); AutodetectProcessManager manager = createManager(communicator); - givenAllocationWithState(JobState.OPENED); InputStream inputStream = createInputStream(""); manager.openJob("foo", 1L, false, e -> {}); @@ -337,10 +332,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { verify(autodetectProcess, times(1)).close(); } - private void givenAllocationWithState(JobState state) { - when(jobManager.getJobState("foo")).thenReturn(state); - } - private AutodetectProcessManager createManager(AutodetectCommunicator communicator) { Client client = mock(Client.class); return createManager(communicator, client); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java index 37d889a14d2..3e7ce2a5d41 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java @@ -42,9 +42,11 @@ public class CppLogMessageHandlerTests extends ESTestCase { handler.tailStream(); assertTrue(handler.hasLogStreamEnded()); - assertEquals(10211L, handler.getPid(Duration.ofMillis(1))); + // Since this is all being done in one thread and we know the stream has + // been completely consumed at this point the wait duration can be zero + assertEquals(10211L, handler.getPid(Duration.ZERO)); assertEquals("controller (64 bit): Version based on 6.0.0-alpha1 (Build b0d6ef8819418c) " - + "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright()); + + "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright(Duration.ZERO)); assertEquals("Did not understand verb 'a'\n", handler.getErrors()); assertFalse(handler.seenFatalError()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 55de9c375ad..8f4574853d9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -115,7 +115,7 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { deleteAllDatafeeds(client()); deleteAllJobs(client()); for (int i = 0; i < numNodes; i++) { - internalCluster().stopRandomDataNode(); + internalCluster().stopRandomNode(settings -> true); } internalCluster().startNode(Settings.builder().put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false)); ensureStableCluster(1); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java index 68032d1455a..a57b3e61cbe 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java @@ -569,7 +569,7 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(RestStatus.OK.getStatus()); - when(response.getEntity()).thenReturn(new StringEntity("{}")); + when(response.getEntity()).thenReturn(new StringEntity("{}", ContentType.APPLICATION_JSON)); when(client.performRequest(eq("GET"), startsWith("/.marvel-es-1-*"), diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTemplateTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTemplateTests.java index f52186f691a..231d36feeeb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTemplateTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTemplateTests.java @@ -60,9 +60,9 @@ public class LocalExporterTemplateTests extends MonitoringIntegTestCase { request.settings(Settings.builder().put("index.mapper.dynamic", false).build()); // notably absent are: kibana, logstash, and beats - request.mapping("cluster_info", "{\"enabled\": false}"); - request.mapping("node", "{\"enabled\": false}"); - request.mapping("fake", "{\"enabled\": false}"); + request.mapping("cluster_info", "{\"enabled\": false}", XContentType.JSON); + request.mapping("node", "{\"enabled\": false}", XContentType.JSON); + request.mapping("fake", "{\"enabled\": false}", XContentType.JSON); client().admin().indices().create(request).actionGet(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java index 25c9f00e432..3042e6cf096 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java @@ -162,7 +162,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase { watchSource.endObject(); try { watcherClient.preparePutWatch("_name") - .setSource(watchSource.bytes()) + .setSource(watchSource.bytes(), watchSource.contentType()) .get(); fail(); } catch (ElasticsearchParseException e) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/transport/action/WatchRequestValidationTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/transport/action/WatchRequestValidationTests.java index 2bdd0880825..8991a934814 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/transport/action/WatchRequestValidationTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/transport/action/WatchRequestValidationTests.java @@ -69,19 +69,19 @@ public class WatchRequestValidationTests extends ESTestCase { } public void testPutWatchInvalidWatchId() { - ActionRequestValidationException e = new PutWatchRequest("id with whitespaces", BytesArray.EMPTY).validate(); + ActionRequestValidationException e = new PutWatchRequest("id with whitespaces", BytesArray.EMPTY, XContentType.JSON).validate(); assertThat(e, is(notNullValue())); assertThat(e.validationErrors(), hasItem("watch id contains whitespace")); } public void testPutWatchNullId() { - ActionRequestValidationException e = new PutWatchRequest(null, BytesArray.EMPTY).validate(); + ActionRequestValidationException e = new PutWatchRequest(null, BytesArray.EMPTY, XContentType.JSON).validate(); assertThat(e, is(notNullValue())); assertThat(e.validationErrors(), hasItem("watch id is missing")); } public void testPutWatchSourceNull() { - ActionRequestValidationException e = new PutWatchRequest("foo", (BytesReference) null).validate(); + ActionRequestValidationException e = new PutWatchRequest("foo", (BytesReference) null, XContentType.JSON).validate(); assertThat(e, is(notNullValue())); assertThat(e.validationErrors(), hasItem("watch source is missing")); } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index 82f39605214..4de016d6054 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -214,12 +214,23 @@ } - match: { job_id: "to-update" } + - do: + xpack.ml.open_job: + job_id: to-update + - do: xpack.ml.update_job: job_id: to-update body: > { "description":"Post update description", + "detectors": [{"index": 0, "rules": {"target_field_name": "airline", + "rule_conditions": [ { "condition_type": "numerical_actual", + "condition": {"operator": "gt", "value": "10" } } ] } }, + {"index": 1, "description": "updated description"}], + "model_debug_config": { + "bounds_percentile": 99.0 + }, "analysis_limits": { "model_memory_limit": 20 }, @@ -234,8 +245,11 @@ } - match: { job_id: "to-update" } - match: { description: "Post update description" } + - match: { model_debug_config.bounds_percentile: 99.0 } - match: { analysis_limits.model_memory_limit: 20 } - match: { analysis_config.categorization_filters: ["cat3.*"] } + - match: { analysis_config.detectors.0.detector_rules.0.target_field_name: "airline" } + - match: { analysis_config.detectors.1.detector_description: "updated description" } - match: { renormalization_window_days: 10 } - match: { background_persist_interval: 10800 } - match: { model_snapshot_retention_days: 30 } diff --git a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index 78b0d690675..ad05e719efc 100644 --- a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.http.HttpHost; +import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Response; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -14,27 +16,42 @@ import org.elasticsearch.xpack.ml.MachineLearning; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentType.JSON; public class MlBasicMultiNodeIT extends ESRestTestCase { + @SuppressWarnings("unchecked") + public void testMachineLearningInstalled() throws Exception { + Response response = client().performRequest("get", "/_xpack"); + assertEquals(200, response.getStatusLine().getStatusCode()); + Map features = (Map) responseEntityToMap(response).get("features"); + Map ml = (Map) features.get("ml"); + assertNotNull(ml); + assertTrue((Boolean) ml.get("available")); + assertTrue((Boolean) ml.get("enabled")); + } + public void testMiniFarequote() throws Exception { - String jobId = "foo"; + String jobId = "foo1"; createFarequoteJob(jobId); Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); + assertBusy(this::assertSameClusterStateOnAllNodes); String postData = "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + "{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}"; response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data", - Collections.emptyMap(), new StringEntity(postData)); + Collections.emptyMap(), + new StringEntity(postData, randomFrom(ContentType.APPLICATION_JSON, ContentType.create("application/x-ndjson")))); assertEquals(202, response.getStatusLine().getStatusCode()); Map responseBody = responseEntityToMap(response); assertEquals(2, responseBody.get("processed_record_count")); @@ -86,16 +103,18 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings)); + client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}", + ContentType.APPLICATION_JSON)); client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}", + ContentType.APPLICATION_JSON)); // Ensure all data is searchable client().performRequest("post", "_refresh"); - String jobId = "foo"; + String jobId = "foo2"; createFarequoteJob(jobId); String datafeedId = "bar"; createDatafeed(datafeedId, jobId); @@ -148,7 +167,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { xContentBuilder.field("_source", true); xContentBuilder.endObject(); return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId, - Collections.emptyMap(), new StringEntity(xContentBuilder.string())); + Collections.emptyMap(), new StringEntity(xContentBuilder.string(), ContentType.APPLICATION_JSON)); } private Response createFarequoteJob(String jobId) throws Exception { @@ -176,11 +195,35 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { xContentBuilder.endObject(); return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, - Collections.emptyMap(), new StringEntity(xContentBuilder.string())); + Collections.emptyMap(), new StringEntity(xContentBuilder.string(), ContentType.APPLICATION_JSON)); } private static Map responseEntityToMap(Response response) throws IOException { return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false); } + // When open job api returns the cluster state on nodes other than master node or node that acted as coordinating node, + // may not have had the latest update with job state set to opened. This may fail subsequent post data, flush, or + // close calls until that node that is running the job task has applied the cluster state where job state has been set to opened. + // this method waits until all nodes in the cluster have the same cluster state version, so that such failures can be + // avoided in tests. Note that the job has been started on the node running the job task (autodetect process is running), + // this is just a workaround for inconsistency in cluster states that may happen for a small amount of time. + private void assertSameClusterStateOnAllNodes(){ + assert getClusterHosts().size() > 1; + Set versions = new HashSet<>(); + for (HttpHost host : getClusterHosts()) { + try { + // Client round robins between cluster hosts: + Response response = client().performRequest("get", "/_cluster/state/version", Collections.singletonMap("local", "true")); + assertEquals(200, response.getStatusLine().getStatusCode()); + int version = (Integer) responseEntityToMap(response).get("version"); + logger.info("Sampled version [{}]", version); + versions.add(version); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + assertEquals(1, versions.size()); + } + } diff --git a/qa/ml-disabled/src/test/java/org/elasticsearch/xpack/ml/integration/MlPluginDisabledIT.java b/qa/ml-disabled/src/test/java/org/elasticsearch/xpack/ml/integration/MlPluginDisabledIT.java index df057a9e379..464a020fccc 100644 --- a/qa/ml-disabled/src/test/java/org/elasticsearch/xpack/ml/integration/MlPluginDisabledIT.java +++ b/qa/ml-disabled/src/test/java/org/elasticsearch/xpack/ml/integration/MlPluginDisabledIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -47,7 +48,8 @@ public class MlPluginDisabledIT extends ESRestTestCase { xContentBuilder.endObject(); ResponseException exception = expectThrows(ResponseException.class, () -> client().performRequest("put", - MachineLearning.BASE_PATH + "anomaly_detectors/foo", Collections.emptyMap(), new StringEntity(xContentBuilder.string()))); + MachineLearning.BASE_PATH + "anomaly_detectors/foo", Collections.emptyMap(), + new StringEntity(xContentBuilder.string(), ContentType.APPLICATION_JSON))); assertThat(exception.getMessage(), containsString("No handler found for uri [/_xpack/ml/anomaly_detectors/foo] and method [PUT]")); } } diff --git a/qa/ml-single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java b/qa/ml-single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java index 174d24a410d..3e762daff5d 100644 --- a/qa/ml-single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java +++ b/qa/ml-single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.transforms; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Response; @@ -188,13 +189,13 @@ public class PainlessDomainSplitIT extends ESRestTestCase { private void createIndex(String name, Settings settings) throws IOException { assertOK(client().performRequest("PUT", name, Collections.emptyMap(), - new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }"))); + new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }", ContentType.APPLICATION_JSON))); } private void createIndex(String name, Settings settings, String mapping) throws IOException { assertOK(client().performRequest("PUT", name, Collections.emptyMap(), new StringEntity("{ \"settings\": " + Strings.toString(settings) - + ", \"mappings\" : {" + mapping + "} }"))); + + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); } public void testIsolated() throws Exception { @@ -205,7 +206,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase { createIndex("painless", settings.build()); client().performRequest("PUT", "painless/test/1", Collections.emptyMap(), - new StringEntity("{\"test\": \"test\"}")); + new StringEntity("{\"test\": \"test\"}", ContentType.APPLICATION_JSON)); client().performRequest("POST", "painless/_refresh"); Pattern pattern = Pattern.compile("domain_split\":\\[(.*?),(.*?)\\]"); @@ -230,7 +231,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase { " }\n" + " }\n" + " }\n" + - "}"); + "}", ContentType.APPLICATION_JSON); Response response = client().performRequest("GET", "painless/test/_search", Collections.emptyMap(), body); String responseBody = EntityUtils.toString(response.getEntity()); @@ -275,7 +276,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase { " }"; client().performRequest("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/painless", Collections.emptyMap(), - new StringEntity(job)); + new StringEntity(job, ContentType.APPLICATION_JSON)); client().performRequest("POST", MachineLearning.BASE_PATH + "anomaly_detectors/painless/_open"); // Create index to hold data @@ -304,14 +305,14 @@ public class PainlessDomainSplitIT extends ESRestTestCase { client().performRequest("PUT", "painless/test/" + time.toDateTimeISO() + "_" + j, Collections.emptyMap(), new StringEntity("{\"domain\": \"" + "bar.bar.com\", \"time\": \"" + time.toDateTimeISO() - + "\"}")); + + "\"}", ContentType.APPLICATION_JSON)); } } else { // Non-anomalous values will be what's seen when the anomaly is reported client().performRequest("PUT", "painless/test/" + time.toDateTimeISO(), Collections.emptyMap(), new StringEntity("{\"domain\": \"" + test.hostName + "\", \"time\": \"" + time.toDateTimeISO() - + "\"}")); + + "\"}", ContentType.APPLICATION_JSON)); } } @@ -330,10 +331,9 @@ public class PainlessDomainSplitIT extends ESRestTestCase { " }"; client().performRequest("PUT", MachineLearning.BASE_PATH + "datafeeds/painless", Collections.emptyMap(), - new StringEntity(body)); + new StringEntity(body, ContentType.APPLICATION_JSON)); client().performRequest("POST", MachineLearning.BASE_PATH + "datafeeds/painless/_start"); - boolean passed = awaitBusy(() -> { try { client().performRequest("POST", "/_refresh");